summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp39
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp372
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp237
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h154
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp34
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp14
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h8
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp25
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h4
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp26
-rw-r--r--src/mongo/db/pipeline/resume_token.h12
-rw-r--r--src/mongo/db/pipeline/resume_token_test.cpp22
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h4
-rw-r--r--src/mongo/db/repl/oplog_interface_mock.cpp4
-rw-r--r--src/mongo/db/transaction_history_iterator.cpp35
-rw-r--r--src/mongo/db/transaction_history_iterator.h14
17 files changed, 744 insertions, 264 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 7a8451dab0a..20c146a73ca 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -159,19 +159,46 @@ void DocumentSourceChangeStream::checkValueType(const Value v,
namespace {
/**
- * Constructs a filter matching 'applyOps' oplog entries that:
- * 1) Represent a committed transaction (i.e., not just the "prepare" part of a two-phase
- * transaction).
- * 2) Have sub-entries which should be returned in the change stream.
+ * Constructs a filter matching any 'applyOps' commands that commit a transaction. An 'applyOps'
+ * command implicitly commits a transaction if _both_ of the following are true:
+ * 1) it is not marked with the 'partialTxn' field, which would indicate that there are more entries
+ * to come in the transaction and
+ * 2) it is not marked with the 'prepare' field, which would indicate that the transaction is only
+ * committed if there is a follow-up 'commitTransaction' command in the oplog.
+ *
+ * This filter will ignore all but the last 'applyOps' command in a transaction comprising multiple
+ * 'applyOps' commands, and it will ignore all 'applyOps' commands in a prepared transaction. The
+ * change stream traverses back through the oplog to recover the ignored commands when it sees an
+ * entry that commits a transaction.
+ *
+ * As an optimization, this filter also ignores any transaction with just a single 'applyOps' if
+ * that 'applyOps' does not contain any updates that modify the namespace that the change stream is
+ * watching.
*/
BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) {
BSONObjBuilder applyOpsBuilder;
applyOpsBuilder.append("op", "c");
+
+ // "o.applyOps" must be an array with at least one element
+ applyOpsBuilder.append("o.applyOps.0", BSON("$exists" << true));
applyOpsBuilder.append("lsid", BSON("$exists" << true));
applyOpsBuilder.append("txnNumber", BSON("$exists" << true));
applyOpsBuilder.append("o.prepare", BSON("$not" << BSON("$eq" << true)));
- const std::string& kApplyOpsNs = "o.applyOps.ns";
- applyOpsBuilder.appendAs(nsMatch, kApplyOpsNs);
+ applyOpsBuilder.append("o.partialTxn", BSON("$not" << BSON("$eq" << true)));
+ {
+ // Include this 'applyOps' if it has an operation with a matching namespace _or_ if it has a
+ // 'prevOpTime' link to another 'applyOps' command, indicating a multi-entry transaction.
+ BSONArrayBuilder orBuilder(applyOpsBuilder.subarrayStart("$or"));
+ {
+ {
+ BSONObjBuilder nsMatchBuilder(orBuilder.subobjStart());
+ nsMatchBuilder.appendAs(nsMatch, "o.applyOps.ns"_sd);
+ }
+ // The default repl::OpTime is the value used to indicate a null "prevOpTime" link.
+ orBuilder.append(BSON(repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName
+ << BSON("$ne" << repl::OpTime().toBSON())));
+ }
+ }
return applyOpsBuilder.obj();
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 7f093290eb1..016d7132283 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/transaction_history_iterator.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
@@ -75,8 +76,6 @@ using DSChangeStream = DocumentSourceChangeStream;
static const Timestamp kDefaultTs(100, 1);
static const repl::OpTime kDefaultOpTime(kDefaultTs, 1);
-static const Timestamp kPreparedTransactionTs(99, 1);
-static const repl::OpTime kPreparedTransactionOpTime(kPreparedTransactionTs, 1);
static const NamespaceString nss("unittests.change_stream");
static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}");
@@ -89,15 +88,47 @@ public:
struct MockMongoInterface final : public StubMongoProcessInterface {
+ // This mock iterator simulates a traversal of transaction history in the oplog by returning
+ // mock oplog entries from a list.
+ struct MockTransactionHistoryIterator : public TransactionHistoryIteratorBase {
+ bool hasNext() const final {
+ return (mockEntriesIt != mockEntries.end());
+ }
+
+ repl::OplogEntry next(OperationContext* opCtx) final {
+ ASSERT(hasNext());
+ return *(mockEntriesIt++);
+ }
+
+ repl::OpTime nextOpTime(OperationContext* opCtx) final {
+ ASSERT(hasNext());
+ return (mockEntriesIt++)->getOpTime();
+ }
+
+ std::vector<repl::OplogEntry> mockEntries;
+ std::vector<repl::OplogEntry>::const_iterator mockEntriesIt;
+ };
+
MockMongoInterface(std::vector<FieldPath> fields,
- boost::optional<repl::OplogEntry> preparedTransaction = {})
- : _fields(std::move(fields)), _preparedTransaction(preparedTransaction) {}
-
- // For tests of "commitTransaction" commands.
- repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx,
- repl::OpTime lookupTime) final {
- invariant(_preparedTransaction && (lookupTime == _preparedTransaction->getOpTime()));
- return *_preparedTransaction;
+ std::vector<repl::OplogEntry> transactionEntries = {})
+ : _fields(std::move(fields)), _transactionEntries(std::move(transactionEntries)) {}
+
+ // For tests of transactions that involve multiple oplog entries.
+ std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
+ repl::OpTime time) const {
+ auto iterator = stdx::make_unique<MockTransactionHistoryIterator>();
+
+ // Simulate a lookup on the oplog timestamp by manually advancing the iterator until we
+ // reach the desired timestamp.
+ iterator->mockEntries = _transactionEntries;
+ ASSERT(iterator->mockEntries.size() > 0);
+ for (iterator->mockEntriesIt = iterator->mockEntries.begin();
+ iterator->mockEntriesIt->getOpTime() != time;
+ ++iterator->mockEntriesIt) {
+ ASSERT(iterator->mockEntriesIt != iterator->mockEntries.end());
+ }
+
+ return iterator;
}
// For "insert" tests.
@@ -107,7 +138,17 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
}
std::vector<FieldPath> _fields;
- boost::optional<repl::OplogEntry> _preparedTransaction;
+
+ // Stores oplog entries associated with a commit operation, including the oplog entries that a
+ // real DocumentSourceChangeStream would not see, because they are marked with a "prepare" or
+ // "partialTxn" flag. When the DocumentSourceChangeStream sees the commit for the transaction,
+ // either an explicit "commitCommand" or an implicit commit represented by an "applyOps" that is
+ // not marked with the "prepare" or "partialTxn" flag, it uses a TransactionHistoryIterator to
+ // go back and look up these entries.
+ //
+ // These entries are stored in the order they would be returned by the
+ // TransactionHistoryIterator, which is the _reverse_ of the order they appear in the oplog.
+ std::vector<repl::OplogEntry> _transactionEntries;
};
class ChangeStreamStageTest : public ChangeStreamStageTestNoSetup {
@@ -129,12 +170,12 @@ public:
std::vector<FieldPath> docKeyFields = {},
const BSONObj& spec = kDefaultSpec,
const boost::optional<Document> expectedInvalidate = {},
- const boost::optional<repl::OplogEntry> preparedTransaction = {}) {
+ const std::vector<repl::OplogEntry> transactionEntries = {}) {
vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec);
auto closeCursor = stages.back();
getExpCtx()->mongoProcessInterface =
- stdx::make_unique<MockMongoInterface>(docKeyFields, preparedTransaction);
+ stdx::make_unique<MockMongoInterface>(docKeyFields, transactionEntries);
auto next = closeCursor->getNext();
// Match stage should pass the doc down if expectedDoc is given.
@@ -218,11 +259,13 @@ public:
ImplicitValue uuid = Value(),
ImplicitValue docKey = Value(),
ResumeTokenData::FromInvalidate fromInvalidate =
- ResumeTokenData::FromInvalidate::kNotFromInvalidate) {
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ size_t txnOpIndex = 0) {
ResumeTokenData tokenData;
tokenData.clusterTime = ts;
tokenData.documentKey = docKey;
tokenData.fromInvalidate = fromInvalidate;
+ tokenData.txnOpIndex = txnOpIndex;
if (!uuid.missing())
tokenData.uuid = uuid.getUuid();
return ResumeToken(tokenData).toDocument();
@@ -289,7 +332,9 @@ public:
boost::optional<UUID> uuid = testUuid(),
boost::optional<bool> fromMigrate = boost::none,
boost::optional<BSONObj> object2 = boost::none,
- boost::optional<repl::OpTime> opTime = boost::none) {
+ boost::optional<repl::OpTime> opTime = boost::none,
+ OperationSessionInfo sessionInfo = {},
+ boost::optional<repl::OpTime> prevOpTime = {}) {
long long hash = 1LL;
return repl::OplogEntry(opTime ? *opTime : kDefaultOpTime, // optime
hash, // hash
@@ -300,11 +345,11 @@ public:
repl::OplogEntry::kOplogVersion, // version
object, // o
object2, // o2
- {}, // sessionInfo
+ sessionInfo, // sessionInfo
boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
- boost::none, // optime of previous write within same transaction
+ prevOpTime, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none); // post-image optime
}
@@ -962,21 +1007,19 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact
Document preparedApplyOps{
{"applyOps",
Value{std::vector<Document>{
- Document{{"op", "i"_sd},
- {"ns", nss.ns()},
- {"ui", testUuid()},
- {"o", Value{Document{{"_id", 123}}}}},
+ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}},
}}},
{"prepare", true},
};
+ repl::OpTime applyOpsOpTime(Timestamp(99, 1), 1);
auto preparedTransaction = makeOplogEntry(OpTypeEnum::kCommand,
nss.getCommandNS(),
preparedApplyOps.toBson(),
testUuid(),
boost::none, // fromMigrate
boost::none, // o2 field
- kPreparedTransactionOpTime);
+ applyOpsOpTime);
// Create an oplog entry representing the commit for the prepared transaction. The commit has a
// 'prevWriteOpTimeInTransaction' value that matches the 'preparedApplyOps' entry, which the
@@ -984,23 +1027,23 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact
OperationSessionInfo sessionInfo;
sessionInfo.setTxnNumber(1);
sessionInfo.setSessionId(makeLogicalSessionIdForTest());
- auto oplogEntry = repl::OplogEntry(
- kDefaultOpTime, // optime
- 1LL, // hash
- OpTypeEnum::kCommand, // opType
- nss.getCommandNS(), // namespace
- boost::none, // uuid
- boost::none, // fromMigrate
- repl::OplogEntry::kOplogVersion, // version
- BSON("commitTransaction" << 1), // o
- boost::none, // o2
- sessionInfo, // sessionInfo
- boost::none, // upsert
- boost::none, // wall clock time
- boost::none, // statement id
- kPreparedTransactionOpTime, // optime of previous write within same transaction
- boost::none, // pre-image optime
- boost::none); // post-image optime
+ auto oplogEntry =
+ repl::OplogEntry(kDefaultOpTime, // optime
+ 1LL, // hash
+ OpTypeEnum::kCommand, // opType
+ nss.getCommandNS(), // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ repl::OplogEntry::kOplogVersion, // version
+ BSON("commitTransaction" << 1), // o
+ boost::none, // o2
+ sessionInfo, // sessionInfo
+ boost::none, // upsert
+ boost::none, // wall clock time
+ boost::none, // statement id
+ applyOpsOpTime, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
// When the DocumentSourceChangeStreamTransform sees the "commitTransaction" oplog entry, we
// expect it to return the insert op within our 'preparedApplyOps' oplog entry.
@@ -1015,7 +1058,256 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact
{DSChangeStream::kDocumentKeyField, D{}},
};
- checkTransformation(oplogEntry, expectedResult, {}, kDefaultSpec, {}, preparedTransaction);
+ checkTransformation(oplogEntry, expectedResult, {}, kDefaultSpec, {}, {preparedTransaction});
+}
+
+TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setTxnNumber(1);
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+
+ // Create two applyOps entries that together represent a whole transaction.
+ repl::OpTime applyOpsOpTime1(Timestamp(100, 1), 1);
+ Document applyOps1{
+ {"applyOps",
+ V{std::vector<Document>{
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{Document{{"_id", 123}}}}},
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{Document{{"_id", 456}}}}},
+ }}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps1.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime1,
+ sessionInfo,
+ repl::OpTime());
+
+ repl::OpTime applyOpsOpTime2(Timestamp(100, 2), 1);
+ Document applyOps2{
+ {"applyOps",
+ V{std::vector<Document>{
+ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}},
+ }}},
+ /* The abscence of the "partialTxn" and "prepare" fields indicates that this command commits
+ the transaction. */
+ };
+
+ auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps2.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime2,
+ sessionInfo,
+ applyOpsOpTime1);
+
+ // We do not use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+ auto stages = makeStages(transactionEntry2);
+ auto transform = stages[2].get();
+ invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
+
+ // Populate the MockTransactionHistoryEditor in reverse chronological order.
+ getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(
+ std::vector<FieldPath>{},
+ std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1});
+
+ // We should get three documents from the change stream, based on the documents in the two
+ // applyOps entries.
+ auto next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ auto nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(resumeToken,
+ makeResumeToken(applyOpsOpTime2.getTimestamp(),
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 0));
+
+ next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(resumeToken,
+ makeResumeToken(applyOpsOpTime2.getTimestamp(),
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 1));
+
+ next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 789);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(resumeToken,
+ makeResumeToken(applyOpsOpTime2.getTimestamp(),
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 2));
+}
+
+TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setTxnNumber(1);
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+
+ // Create two applyOps entries that together represent a whole transaction.
+ repl::OpTime applyOpsOpTime1(Timestamp(99, 1), 1);
+ Document applyOps1{
+ {"applyOps",
+ V{std::vector<Document>{
+ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}},
+ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}},
+ }}},
+ {"partialTxn", true},
+ };
+
+ auto transactionEntry1 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps1.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime1,
+ sessionInfo,
+ repl::OpTime());
+
+ repl::OpTime applyOpsOpTime2(Timestamp(99, 2), 1);
+ Document applyOps2{
+ {"applyOps",
+ V{std::vector<Document>{
+ D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}},
+ }}},
+ {"prepare", true},
+ };
+
+ auto transactionEntry2 = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOps2.toBson(),
+ testUuid(),
+ boost::none, // fromMigrate
+ boost::none, // o2 field
+ applyOpsOpTime2,
+ sessionInfo,
+ applyOpsOpTime1);
+
+ // Create an oplog entry representing the commit for the prepared transaction.
+ auto commitEntry =
+ repl::OplogEntry(kDefaultOpTime, // optime
+ 1LL, // hash
+ OpTypeEnum::kCommand, // opType
+ nss.getCommandNS(), // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ repl::OplogEntry::kOplogVersion, // version
+ BSON("commitTransaction" << 1), // o
+ boost::none, // o2
+ sessionInfo, // sessionInfo
+ boost::none, // upsert
+ boost::none, // wall clock time
+ boost::none, // statement id
+ applyOpsOpTime2, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+
+ // We do not use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+ auto stages = makeStages(commitEntry);
+ auto transform = stages[2].get();
+ invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
+
+ // Populate the MockTransactionHistoryEditor in reverse chronological order.
+ getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(
+ std::vector<FieldPath>{},
+ std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1});
+
+ // We should get three documents from the change stream, based on the documents in the two
+ // applyOps entries.
+ auto next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ auto nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ auto resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(
+ resumeToken,
+ makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 0));
+
+ next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 456);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(
+ resumeToken,
+ makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 1));
+
+ next = transform->getNext();
+ ASSERT(next.isAdvanced());
+ nextDoc = next.releaseDocument();
+ ASSERT_EQ(nextDoc[DSChangeStream::kTxnNumberField].getLong(), *sessionInfo.getTxnNumber());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 789);
+ ASSERT_EQ(
+ nextDoc["lsid"].getDocument().toBson().woCompare(sessionInfo.getSessionId()->toBSON()), 0);
+ resumeToken = ResumeToken::parse(nextDoc["_id"].getDocument()).toDocument();
+ ASSERT_DOCUMENT_EQ(
+ resumeToken,
+ makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
+ testUuid(),
+ V{D{}},
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ 2));
}
TEST_F(ChangeStreamStageTest, TransformApplyOps) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 1220286da9e..0773a2fb347 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -141,19 +141,15 @@ ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts,
Value uuid,
Value documentKey) {
ResumeTokenData resumeTokenData;
- if (_txnContext) {
+ if (_txnIterator) {
// We're in the middle of unwinding an 'applyOps'.
// Use the clusterTime from the higher level applyOps
- resumeTokenData.clusterTime = _txnContext->clusterTime;
-
- // 'pos' points to the _next_ applyOps index, so we must subtract one to get the index of
- // the entry being examined right now.
- invariant(_txnContext->pos >= 1);
- resumeTokenData.applyOpsIndex = _txnContext->pos - 1;
+ resumeTokenData.clusterTime = _txnIterator->clusterTime();
+ resumeTokenData.txnOpIndex = _txnIterator->txnOpIndex();
} else {
resumeTokenData.clusterTime = ts.getTimestamp();
- resumeTokenData.applyOpsIndex = 0;
+ resumeTokenData.txnOpIndex = 0;
}
resumeTokenData.documentKey = documentKey;
@@ -328,10 +324,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
auto resumeToken = ResumeToken(resumeTokenData).toDocument();
// Add some additional fields only relevant to transactions.
- if (_txnContext) {
+ if (_txnIterator) {
doc.addField(DocumentSourceChangeStream::kTxnNumberField,
- Value(static_cast<long long>(_txnContext->txnNumber)));
- doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnContext->lsid));
+ Value(static_cast<long long>(_txnIterator->txnNumber())));
+ doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnIterator->lsid()));
}
doc.addField(DocumentSourceChangeStream::kIdField, Value(resumeToken));
@@ -409,55 +405,6 @@ DocumentSource::GetModPathsReturn DocumentSourceChangeStreamTransform::getModifi
return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}};
}
-void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Document& input) {
- // The only two commands we will see here are an applyOps or a commit, which both mean we
- // need to open a "transaction context" representing a group of updates that all occurred at
- // once as part of a transaction. If we already have a transaction context open, that would
- // mean we are looking at an applyOps or commit nested within an applyOps, which is not
- // allowed in the oplog.
- invariant(!_txnContext);
-
- Value lsid = input["lsid"];
- checkValueType(lsid, "lsid", BSONType::Object);
-
- Value txnNumber = input["txnNumber"];
- checkValueType(txnNumber, "txnNumber", BSONType::NumberLong);
-
- Value ts = input[repl::OplogEntry::kTimestampFieldName];
- Timestamp txnApplyTime = ts.getTimestamp();
-
- auto commandObj = input["o"].getDocument();
- Value applyOps = commandObj["applyOps"];
- if (!applyOps.missing()) {
- // An "applyOps" command represents an immediately-committed transaction. We place the
- // operations within the "applyOps" array directly into the transaction context.
- applyOps = input.getNestedField("o.applyOps");
- } else {
- invariant(!commandObj["commitTransaction"].missing());
-
- // A "commit" command is the second part of a transaction that has been split up into
- // two oplog entries. The lsid, txnNumber, and timestamp are in this entry, but the
- // "applyOps" array is in a previous entry, which we must look up.
- repl::OpTime opTime;
- uassertStatusOK(bsonExtractOpTimeField(input.toBson(), "prevOpTime", &opTime));
-
- auto applyOpsEntry =
- pExpCtx->mongoProcessInterface->lookUpOplogEntryByOpTime(pExpCtx->opCtx, opTime);
- invariant(applyOpsEntry.isCommand() &&
- (repl::OplogEntry::CommandType::kApplyOps == applyOpsEntry.getCommandType()));
- invariant(applyOpsEntry.shouldPrepare());
-
- auto bsonOp = applyOpsEntry.getOperationToApply();
- invariant(BSONType::Array == bsonOp["applyOps"].type());
- applyOps = Value(bsonOp["applyOps"]);
- }
-
- checkValueType(applyOps, "applyOps", BSONType::Array);
- invariant(applyOps.getArrayLength() > 0);
-
- _txnContext.emplace(applyOps, txnApplyTime, lsid.getDocument(), txnNumber.getLong());
-}
-
DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
pExpCtx->checkForInterrupt();
@@ -469,10 +416,11 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
while (1) {
// If we're unwinding an 'applyOps' from a transaction, check if there are any documents we
// have stored that can be returned.
- if (_txnContext) {
- if (auto next = extractNextApplyOpsEntry()) {
+ if (_txnIterator) {
+ if (auto next = _txnIterator->getNextTransactionOp(pExpCtx->opCtx)) {
return applyTransformation(*next);
}
+ _txnIterator = boost::none;
}
// Get the next input document.
@@ -498,16 +446,103 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
return applyTransformation(doc);
}
- initializeTransactionContext(doc);
+ // The only two commands we will see here are an applyOps or a commit, which both mean we
+ // need to open a "transaction context" representing a group of updates that all occurred at
+ // once as part of a transaction. If we already have a transaction context open, that would
+ // mean we are looking at an applyOps or commit nested within an applyOps, which is not
+ // allowed in the oplog.
+ invariant(!_txnIterator);
+ _txnIterator.emplace(pExpCtx->opCtx, pExpCtx->mongoProcessInterface, doc, *_nsRegex);
+
+ // Once we initialize the transaction iterator, we can loop back to the top in order to call
+ // 'getNextTransactionOp' on it. Note that is possible for the transaction iterator
+ // to be empty of any relevant operations, meaning that this loop may need to execute
+ // multiple times before it encounters a relevant change to return.
+ }
+}
+
+DocumentSourceChangeStreamTransform::TransactionOpIterator::TransactionOpIterator(
+ OperationContext* opCtx,
+ std::shared_ptr<MongoProcessInterface> mongoProcessInterface,
+ const Document& input,
+ const pcrecpp::RE& nsRegex)
+ : _mongoProcessInterface(mongoProcessInterface), _nsRegex(nsRegex) {
+ Value lsidValue = input["lsid"];
+ checkValueType(lsidValue, "lsid", BSONType::Object);
+ _lsid = lsidValue.getDocument();
+
+ Value txnNumberValue = input["txnNumber"];
+ checkValueType(txnNumberValue, "txnNumber", BSONType::NumberLong);
+ _txnNumber = txnNumberValue.getLong();
+
+ // We want to parse the OpTime out of this document using the BSON OpTime parser. Instead of
+ // converting the entire Document back to BSON, we convert only the fields we need.
+ repl::OpTime txnOpTime = repl::OpTime::parse(BSON(repl::OpTime::kTimestampFieldName
+ << input[repl::OpTime::kTimestampFieldName]
+ << repl::OpTime::kTermFieldName
+ << input[repl::OpTime::kTermFieldName]));
+ _clusterTime = txnOpTime.getTimestamp();
+
+ auto commandObj = input["o"].getDocument();
+ Value applyOps = commandObj["applyOps"];
+
+ if (!applyOps.missing()) {
+ // We found an applyOps that implicitly commits a transaction. We include it in the
+ // '_txnOplogEntries' stack of applyOps entries that the change stream should process as
+ // part of this transaction. There may be additional applyOps entries linked through the
+ // 'prevOpTime' field, which will also get added to '_txnOplogEntries' later in this
+ // function. Note that this style of transaction does not have a 'commitTransaction'
+ // command.
+ _txnOplogEntries.push(txnOpTime);
+ } else {
+ // This must be a "commitTransaction" command, which commits a prepared transaction. This
+ // style of transaction does not have an applyOps entry that implicitly commits it, as in
+ // the previous case. We're going to iterate through the other oplog entries in the
+ // transaction, but this entry does not have any updates in it, so we do not include it in
+ // the '_txnOplogEntries' stack.
+ invariant(!commandObj["commitTransaction"].missing());
+ }
- // Once we initialize the transaction context, we can loop back to the top in order to call
- // 'extractNextApplyOpsEntry' on it. Note that is possible for the transaction context to be
- // empty of any relevant operations, meaning that this loop may need to execute multiple
- // times before it encounters a relevant change to return.
+ if (BSONType::Object ==
+ input[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName].getType()) {
+ // As with the 'txnOpTime' parsing above, we convert a portion of 'input' back to BSON in
+ // order to parse an OpTime, this time from the "prevOpTime" field.
+ repl::OpTime prevOpTime = repl::OpTime::parse(
+ input[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName].getDocument().toBson());
+ _collectAllOpTimesFromTransaction(opCtx, prevOpTime);
}
+
+ // Pop the first OpTime off the stack and use it to load the first oplog entry into the
+ // '_currentApplyOps' field.
+ invariant(_txnOplogEntries.size() > 0);
+ const auto firstTimestamp = _txnOplogEntries.top();
+ _txnOplogEntries.pop();
+
+ if (firstTimestamp == txnOpTime) {
+ // This transaction consists of only one oplog entry, from which we have already extracted
+ // the "applyOps" array, so there is no need to do any more work.
+ invariant(_txnOplogEntries.size() == 0);
+ _currentApplyOps = std::move(applyOps);
+ } else {
+ // This transaction consists of multiple oplog entries; grab the chronologically first entry
+ // and extract its "applyOps" array.
+ auto firstApplyOpsEntry = _lookUpOplogEntryByOpTime(opCtx, firstTimestamp);
+
+ auto bsonOp = firstApplyOpsEntry.getOperationToApply();
+ invariant(BSONType::Array == bsonOp["applyOps"].type());
+ _currentApplyOps = Value(bsonOp["applyOps"]);
+ }
+
+ checkValueType(_currentApplyOps, "applyOps", BSONType::Array);
+ invariant(_currentApplyOps.getArrayLength() > 0);
+
+ // Initialize iterators at the beginning of the transaction.
+ _currentApplyOpsIt = _currentApplyOps.getArray().begin();
+ _txnOpIndex = 0;
}
-bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) {
+bool DocumentSourceChangeStreamTransform::TransactionOpIterator::_isDocumentRelevant(
+ const Document& d) const {
invariant(
d["op"].getType() == BSONType::String,
str::stream()
@@ -519,21 +554,71 @@ bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d)
Value nsField = d["ns"];
invariant(!nsField.missing());
- return _nsRegex->PartialMatch(nsField.getString());
+ return _nsRegex.PartialMatch(nsField.getString());
}
-boost::optional<Document> DocumentSourceChangeStreamTransform::extractNextApplyOpsEntry() {
+boost::optional<Document>
+DocumentSourceChangeStreamTransform::TransactionOpIterator::getNextTransactionOp(
+ OperationContext* opCtx) {
+ while (true) {
+ while (_currentApplyOpsIt != _currentApplyOps.getArray().end()) {
+ Document d = (_currentApplyOpsIt++)->getDocument();
+ ++_txnOpIndex;
+ if (_isDocumentRelevant(d)) {
+ return d;
+ }
+ }
- while (_txnContext && _txnContext->pos < _txnContext->arr.size()) {
- Document d = _txnContext->arr[_txnContext->pos++].getDocument();
- if (isDocumentRelevant(d)) {
- return d;
+ if (_txnOplogEntries.empty()) {
+ // There are no more operations in this transaction.
+ return boost::none;
}
+
+ // We've processed all the operations in the previous applyOps entry, but we have a new one
+ // to process.
+ auto applyOpsEntry = _lookUpOplogEntryByOpTime(opCtx, _txnOplogEntries.top());
+ _txnOplogEntries.pop();
+
+ auto bsonOp = applyOpsEntry.getOperationToApply();
+ invariant(BSONType::Array == bsonOp["applyOps"].type());
+
+ _currentApplyOps = Value(bsonOp["applyOps"]);
+ _currentApplyOpsIt = _currentApplyOps.getArray().begin();
}
+}
+
+repl::OplogEntry
+DocumentSourceChangeStreamTransform::TransactionOpIterator::_lookUpOplogEntryByOpTime(
+ OperationContext* opCtx, repl::OpTime lookupTime) const {
+ invariant(!lookupTime.isNull());
+
+ std::unique_ptr<TransactionHistoryIteratorBase> iterator(
+ _mongoProcessInterface->createTransactionHistoryIterator(lookupTime));
+ try {
+ return iterator->next(opCtx);
+ } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) {
+ ex.addContext(
+ "Oplog no longer has history necessary for $changeStream to observe operations from a "
+ "committed transaction.");
+ uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason());
+ }
+}
- _txnContext = boost::none;
+void DocumentSourceChangeStreamTransform::TransactionOpIterator::_collectAllOpTimesFromTransaction(
+ OperationContext* opCtx, repl::OpTime firstOpTime) {
+ std::unique_ptr<TransactionHistoryIteratorBase> iterator(
+ _mongoProcessInterface->createTransactionHistoryIterator(firstOpTime));
- return boost::none;
+ try {
+ while (iterator->hasNext()) {
+ _txnOplogEntries.push(iterator->nextOpTime(opCtx));
+ }
+ } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) {
+ ex.addContext(
+ "Oplog no longer has history necessary for $changeStream to observe operations from a "
+ "committed transaction.");
+ uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason());
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 415dacac16d..c20f5864e67 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -86,60 +86,116 @@ private:
};
/**
- * Represents the DocumentSource's state if it's currently reading from an 'applyOps' entry
- * which was created as part of a transaction.
+ * Represents the DocumentSource's state if it's currently reading from a transaction.
+ * Transaction operations are packed into 'applyOps' entries in the oplog.
+ *
+ * This iterator returns operations from a transaction that are relevant to the change stream in
+ * the same order they appear on the oplog (chronological order). Note that the
+ * TransactionHistoryIterator, which this class uses to query the oplog, returns the oplog
+ * entries in _reverse_ order. We internally reverse the output of the
+ * TransactionHistoryIterator in order to get the desired order.
+ *
+ * Note that our view of a transaction in the oplog is like an array of arrays with an "outer"
+ * array of applyOps entries represented by the 'txnOplogEntries' field and "inner" arrays of
+ * applyOps entries. Each applyOps entry gets loaded on demand, with only a single applyOps
+ * loaded into '_applyOpsValue' and '_currentApplyOps' at any time.
+ *
+ * Likewise, there are "outer" and "inner" iterators, 'txnOplogEntriesIt' and
+ * '_currentApplyOpsIt' respectively, that together reference the current transaction operation.
*/
- struct TransactionContext {
- TransactionContext(const TransactionContext&) = delete;
- TransactionContext& operator=(const TransactionContext&) = delete;
-
- // The array of oplog entries from an 'applyOps' representing the transaction. Only kept
- // around so that the underlying memory of 'arr' isn't freed.
- Value opArray;
-
- // Array representation of the 'opArray' field. Stored like this to avoid re-typechecking
- // each call to next(), or copying the entire array.
- const std::vector<Value>& arr;
-
- // Our current place in the 'opArray'.
- size_t pos;
-
- // The clusterTime of the applyOps.
- Timestamp clusterTime;
-
- // Fields that were taken from the 'applyOps' oplog entry.
- Document lsid;
- TxnNumber txnNumber;
-
- TransactionContext(const Value& applyOpsVal,
- Timestamp ts,
- const Document& lsidDoc,
- TxnNumber n)
- : opArray(applyOpsVal),
- arr(opArray.getArray()),
- pos(0),
- clusterTime(ts),
- lsid(lsidDoc),
- txnNumber(n) {}
+ class TransactionOpIterator {
+ public:
+ TransactionOpIterator(const TransactionOpIterator&) = delete;
+ TransactionOpIterator& operator=(const TransactionOpIterator&) = delete;
+
+ TransactionOpIterator(OperationContext* opCtx,
+ std::shared_ptr<MongoProcessInterface> mongoProcessInterface,
+ const Document& input,
+ const pcrecpp::RE& nsRegex);
+
+ // Returns the index for the last operation returned by getNextTransactionOp(). It is
+ // illegal to call this before calling getNextTransactionOp() at least once.
+ size_t txnOpIndex() const {
+ // 'txnOpIndex' points to the _next_ transaction index, so we must subtract one to get
+ // the index of the entry being examined right now.
+ invariant(_txnOpIndex >= 1);
+ return _txnOpIndex - 1;
+ }
+
+ Timestamp clusterTime() const {
+ return _clusterTime;
+ }
+
+ Document lsid() const {
+ return _lsid;
+ }
+
+ TxnNumber txnNumber() const {
+ return _txnNumber;
+ }
+
+ // Extract one Document from the transaction and advance the iterator. Returns boost::none
+ // to indicate that there are no operations left.
+ boost::optional<Document> getNextTransactionOp(OperationContext* opCtx);
+
+ private:
+ // Perform a find on the oplog to find an OplogEntry by its OpTime.
+ repl::OplogEntry _lookUpOplogEntryByOpTime(OperationContext* opCtx,
+ repl::OpTime lookupTime) const;
+
+ // Helper for getNextTransactionOp(). Checks the namespace of the given document to see if
+ // it should be returned in the change stream.
+ bool _isDocumentRelevant(const Document& d) const;
+
+ // Traverse backwards through the oplog by starting at the entry at 'firstOpTime' and
+ // following "prevOpTime" links until reaching the terminal "prevOpTime" value, and push the
+ // OpTime value to '_txnOplogEntries' for each entry traversed, including the 'firstOpTime'
+ // entry. Note that we follow the oplog links _backwards_ through the oplog (i.e., in
+ // reverse chronological order) but because this is a stack, the iterator will process them
+ // in the opposite order, allowing iteration to proceed fowards and return operations in
+ // chronological order.
+ void _collectAllOpTimesFromTransaction(OperationContext* opCtx, repl::OpTime firstOpTime);
+
+ // This stack contains the timestamps for all oplog entries in this transaction that have
+ // yet to be processed by the iterator. Each time the TransactionOpIterator finishes
+ // iterating the contents of the '_currentApplyOps' array, it pops an entry off the stack
+ // and uses it to load the next applyOps entry in the '_currentApplyOps' array, meaning that
+ // the top entry is always the next entry to be processed. From top-to-bottom, the stack is
+ // ordered chronologically, in the same order as entries appear in the oplog.
+ std::stack<repl::OpTime> _txnOplogEntries;
+
+ // The '_currentapplyOps' stores the applyOps array that the TransactionOpIterator is
+ // currently iterating.
+ Value _currentApplyOps;
+
+ // This iterator references the next operation within the '_currentApplyOps' array that the
+ // the getNextTransactionOp() method will return. When there are no more operations to
+ // iterate, this iterator will point to the array's "end" sentinel, and '_txnOplogEntries'
+ // will be empty.
+ typename std::vector<Value>::const_iterator _currentApplyOpsIt;
+
+ // Our current place within the entire transaction, which may consist of multiple 'applyOps'
+ // arrays.
+ size_t _txnOpIndex;
+
+ // The clusterTime of the _applyOps.
+ Timestamp _clusterTime;
+
+ // Fields that were taken from the '_applyOps' oplog entry.
+ Document _lsid;
+ TxnNumber _txnNumber;
+
+ // Used for traversing the oplog with TransactionHistoryInterface.
+ std::shared_ptr<MongoProcessInterface> _mongoProcessInterface;
+
+ // An operation is relevant to a change stream iff its namespace matches this regex.
+ const pcrecpp::RE& _nsRegex;
};
/**
* Helper used for determining what resume token to return.
*/
ResumeTokenData getResumeToken(Value ts, Value uuid, Value documentKey);
- void initializeTransactionContext(const Document& input);
-
- /**
- * Gets the next relevant applyOps entry that should be returned. If there is none, returns
- * empty document.
- */
- boost::optional<Document> extractNextApplyOpsEntry();
-
- /**
- * Helper for extractNextApplyOpsEntry(). Checks the namespace of the given document to see
- * if it should be returned in the change stream.
- */
- bool isDocumentRelevant(const Document& d);
BSONObj _changeStreamSpec;
@@ -151,8 +207,8 @@ private:
// boost::none, and an exact string equality check is used instead.
boost::optional<pcrecpp::RE> _nsRegex;
- // Represents if the current 'applyOps' we're unwinding, if any.
- boost::optional<TransactionContext> _txnContext;
+ // Represents the current transaction we're unwinding, if any.
+ boost::optional<TransactionOpIterator> _txnIterator;
// Set to true if this transformation stage can be run on the collectionless namespace.
bool _isIndependentOfAnyCollection;
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 3ab2ec80191..c3383dfb9d9 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -42,17 +42,17 @@ using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus;
// the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token,
// and ResumeToken::kCannotResume if it is more recent than the client's resume token (indicating
// that we will never see the token). If the resume token's documentKey contains only the _id field
-// while the pipeline documentKey contains additional fields, then the collection has become
-// sharded since the resume token was generated. In that case, we relax the requirements such that
-// only the timestamp, version, applyOpsIndex, UUID and documentKey._id need match. This remains
-// correct, since the only circumstances under which the resume token omits the shard key is if it
-// was generated either (1) before the collection was sharded, (2) after the collection was sharded
-// but before the primary shard became aware of that fact, implying that it was before the first
-// chunk moved off the shard, or (3) by a malicious client who has constructed their own resume
-// token. In the first two cases, we can be guaranteed that the _id is unique and the stream can
-// therefore be resumed seamlessly; in the third case, the worst that can happen is that some
-// entries are missed or duplicated. Note that the simple collation is used to compare the resume
-// tokens, and that we purposefully avoid the user's requested collation if present.
+// while the pipeline documentKey contains additional fields, then the collection has become sharded
+// since the resume token was generated. In that case, we relax the requirements such that only the
+// timestamp, version, txnOpIndex, UUID and documentKey._id need match. This remains correct, since
+// the only circumstances under which the resume token omits the shard key is if it was generated
+// either (1) before the collection was sharded, (2) after the collection was sharded but before the
+// primary shard became aware of that fact, implying that it was before the first chunk moved off
+// the shard, or (3) by a malicious client who has constructed their own resume token. In the first
+// two cases, we can be guaranteed that the _id is unique and the stream can therefore be resumed
+// seamlessly; in the third case, the worst that can happen is that some entries are missed or
+// duplicated. Note that the simple collation is used to compare the resume tokens, and that we
+// purposefully avoid the user's requested collation if present.
ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionContext>& expCtx,
const Document& documentFromResumedStream,
const ResumeTokenData& tokenDataFromClient) {
@@ -78,14 +78,14 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
: ResumeStatus::kCheckNextDoc;
}
- if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) {
+ if (tokenDataFromResumedStream.txnOpIndex < tokenDataFromClient.txnOpIndex) {
return ResumeStatus::kCheckNextDoc;
- } else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) {
- // This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in
- // the applyOps was irrelevant (meaning it was an operation on a collection or DB not being
+ } else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) {
+ // This could happen if the client provided a txnOpIndex of 0, yet the 0th document in the
+ // applyOps was irrelevant (meaning it was an operation on a collection or DB not being
// watched). If we are looking for the resume token on a shard then this simply means that
// the resume token may be on a different shard; otherwise, it indicates a corrupt token.
- uassert(50792, "Invalid resumeToken: applyOpsIndex was skipped", expCtx->needsMerge);
+ uassert(50792, "Invalid resumeToken: txnOpIndex was skipped", expCtx->needsMerge);
// We are running on a merging shard. Signal that we have read beyond the resume token.
return ResumeStatus::kSurpassedToken;
}
@@ -96,7 +96,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
// resumable; we are past the point in the stream where the token should have appeared.
if (tokenDataFromResumedStream.uuid != tokenDataFromClient.uuid) {
// If we are running on a replica set deployment, we don't ever expect to see identical time
- // stamps and applyOpsIndex but differing UUIDs, and we reject the resume attempt at once.
+ // stamps and txnOpIndex but differing UUIDs, and we reject the resume attempt at once.
if (!expCtx->inMongos && !expCtx->needsMerge) {
return ResumeStatus::kSurpassedToken;
}
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index ff7c7674425..85d424b3bdd 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -63,19 +63,19 @@ public:
protected:
/**
* Pushes a document with a resume token corresponding to the given timestamp, version,
- * applyOpsIndex, docKey, and namespace into the mock queue.
+ * txnOpIndex, docKey, and namespace into the mock queue.
*/
void addDocument(
- Timestamp ts, int version, std::size_t applyOpsIndex, Document docKey, UUID uuid) {
+ Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) {
_mock->push_back(
Document{{"_id",
- ResumeToken(ResumeTokenData(ts, version, applyOpsIndex, uuid, Value(docKey)))
+ ResumeToken(ResumeTokenData(ts, version, txnOpIndex, uuid, Value(docKey)))
.toDocument()}});
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, version,
- * applyOpsIndex, docKey, and namespace into the mock queue.
+ * txnOpIndex, docKey, and namespace into the mock queue.
*/
void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
addDocument(ts, 0, 0, docKey, uuid);
@@ -99,11 +99,11 @@ protected:
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
Timestamp ts,
int version,
- std::size_t applyOpsIndex,
+ std::size_t txnOpIndex,
boost::optional<Document> docKey,
UUID uuid) {
auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(
- getExpCtx(), {ts, version, applyOpsIndex, uuid, docKey ? Value(*docKey) : Value()});
+ getExpCtx(), {ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()});
checkResumeToken->setSource(_mock.get());
return checkResumeToken;
}
@@ -409,7 +409,7 @@ TEST_F(CheckResumeTokenTest,
ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
}
-TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) {
+TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
Timestamp resumeTimestamp(100, 1);
// Create an ordered array of 3 UUIDs.
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 1e19144b4f5..87dce6017e9 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -59,6 +59,7 @@ namespace mongo {
class ExpressionContext;
class Pipeline;
class PipelineDeleter;
+class TransactionHistoryIteratorBase;
/**
* Any functionality needed by an aggregation stage that is either context specific to a mongod or
@@ -119,10 +120,11 @@ public:
virtual DBClientBase* directClient() = 0;
/**
- * Query the oplog for an entry with a matching OpTime.
+ * Creates a new TransactionHistoryIterator object. Only applicable in processes which support
+ * locally traversing the oplog.
*/
- virtual repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx,
- repl::OpTime lookupTime) = 0;
+ virtual std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
+ repl::OpTime time) const = 0;
/**
* Note that in some rare cases this could return a false negative but will never return a false
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 16780eb1398..4c3b06ad30c 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -89,8 +89,8 @@ public:
std::vector<GenericCursor> getIdleCursors(const boost::intrusive_ptr<ExpressionContext>& expCtx,
CurrentOpUserMode userMode) const final;
- repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx,
- repl::OpTime lookupTime) final {
+ std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
+ repl::OpTime time) const override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index ca12adc1640..7d493a6ba7a 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -147,28 +147,11 @@ DBClientBase* MongoInterfaceStandalone::directClient() {
return &_client;
}
-repl::OplogEntry MongoInterfaceStandalone::lookUpOplogEntryByOpTime(OperationContext* opCtx,
- repl::OpTime lookupTime) {
- invariant(!lookupTime.isNull());
-
+std::unique_ptr<TransactionHistoryIteratorBase>
+MongoInterfaceStandalone::createTransactionHistoryIterator(repl::OpTime time) const {
bool permitYield = true;
- TransactionHistoryIterator iterator(lookupTime, permitYield);
- try {
- auto result = iterator.next(opCtx);
-
- // This function is intended to link a "commit" command to its corresponding "applyOps"
- // command, which represents a prepared transaction. There should be no additional entries
- // in the transaction's chain of operations. Note that when the oplog changes gated by
- // 'useMultipleOplogEntryFormatForTransactions' become permanent, these assumptions about
- // iterating transactions will no longer hold.
- invariant(!iterator.hasNext());
- return result;
- } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) {
- ex.addContext(
- "Oplog no longer has history necessary for $changeStream to observe operations from a "
- "committed transaction.");
- uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason());
- }
+ return std::unique_ptr<TransactionHistoryIteratorBase>(
+ new TransactionHistoryIterator(time, permitYield));
}
bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index d6b15346f97..9c9fc301c18 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -54,8 +54,8 @@ public:
void setOperationContext(OperationContext* opCtx) final;
DBClientBase* directClient() final;
- virtual repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx,
- repl::OpTime lookupTime) final;
+ std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
+ repl::OpTime time) const final;
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index e9b06a74e9e..02a3fdbccf3 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -59,7 +59,7 @@ ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime,
bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
return clusterTime == other.clusterTime && version == other.version &&
- tokenType == other.tokenType && applyOpsIndex == other.applyOpsIndex &&
+ tokenType == other.tokenType && txnOpIndex == other.txnOpIndex &&
fromInvalidate == other.fromInvalidate && uuid == other.uuid &&
(Value::compare(this->documentKey, other.documentKey, nullptr) == 0);
}
@@ -70,7 +70,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) {
if (tokenData.version > 0) {
out << ", tokenType: " << tokenData.tokenType;
}
- out << ", applyOpsIndex: " << tokenData.applyOpsIndex;
+ out << ", txnOpIndex: " << tokenData.txnOpIndex;
if (tokenData.version > 0) {
out << ", fromInvalidate: " << static_cast<bool>(tokenData.fromInvalidate);
}
@@ -95,8 +95,8 @@ ResumeToken::ResumeToken(const Document& resumeDoc) {
}
// We encode the resume token as a KeyString with the sequence:
-// clusterTime, version, applyOpsIndex, fromInvalidate, uuid, documentKey
-// Only the clusterTime, version, applyOpsIndex, and fromInvalidate are required.
+// clusterTime, version, txnOpIndex, fromInvalidate, uuid, documentKey Only the clusterTime,
+// version, txnOpIndex, and fromInvalidate are required.
ResumeToken::ResumeToken(const ResumeTokenData& data) {
BSONObjBuilder builder;
builder.append("", data.clusterTime);
@@ -104,7 +104,7 @@ ResumeToken::ResumeToken(const ResumeTokenData& data) {
if (data.version >= 1) {
builder.appendNumber("", data.tokenType);
}
- builder.appendNumber("", data.applyOpsIndex);
+ builder.appendNumber("", data.txnOpIndex);
if (data.version >= 1) {
builder.appendBool("", data.fromInvalidate);
}
@@ -186,15 +186,15 @@ ResumeTokenData ResumeToken::getData() const {
result.tokenType = static_cast<ResumeTokenData::TokenType>(typeInt);
}
- // Next comes the applyOps index.
- uassert(50793, "Resume Token does not contain applyOpsIndex", i.more());
- auto applyOpsElt = i.next();
+ // Next comes the txnOpIndex value.
+ uassert(50793, "Resume Token does not contain txnOpIndex", i.more());
+ auto txnOpIndexElt = i.next();
uassert(50855,
- "Resume Token applyOpsIndex is not an integer",
- applyOpsElt.type() == BSONType::NumberInt);
- const int applyOpsInd = applyOpsElt.numberInt();
- uassert(50794, "Invalid Resume Token: applyOpsIndex should be non-negative", applyOpsInd >= 0);
- result.applyOpsIndex = applyOpsInd;
+ "Resume Token txnOpIndex is not an integer",
+ txnOpIndexElt.type() == BSONType::NumberInt);
+ const int txnOpIndexInd = txnOpIndexElt.numberInt();
+ uassert(50794, "Invalid Resume Token: txnOpIndex should be non-negative", txnOpIndexInd >= 0);
+ result.txnOpIndex = txnOpIndexInd;
if (result.version >= 1) {
// The 'fromInvalidate' bool was added in version 1 resume tokens. We don't expect to see it
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 52ae1b1ab1e..236b39aa39a 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -60,12 +60,12 @@ struct ResumeTokenData {
ResumeTokenData(){};
ResumeTokenData(Timestamp clusterTimeIn,
int versionIn,
- size_t applyOpsIndexIn,
+ size_t txnOpIndexIn,
const boost::optional<UUID>& uuidIn,
Value documentKeyIn)
: clusterTime(clusterTimeIn),
version(versionIn),
- applyOpsIndex(applyOpsIndexIn),
+ txnOpIndex(txnOpIndexIn),
uuid(uuidIn),
documentKey(std::move(documentKeyIn)){};
@@ -77,7 +77,11 @@ struct ResumeTokenData {
Timestamp clusterTime;
int version = 1;
TokenType tokenType = TokenType::kEventToken;
- size_t applyOpsIndex = 0;
+ // When a resume token references an operation in a transaction, the 'clusterTime' stores the
+ // commit time of the transaction, and the 'txnOpIndex' field stores the index of the operation
+ // within its transaction. Operations that are not in a transaction always have a value of 0 for
+ // this field.
+ size_t txnOpIndex = 0;
// Flag to indicate that this resume token is from an "invalidate" entry. This will not be set
// on a token from a command that *would* invalidate a change stream, but rather the invalidate
// notification itself.
@@ -93,7 +97,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData);
* This token has the following format:
* {
* _data: String, A hex encoding of the binary generated by keystring encoding the clusterTime,
- * version, applyOpsIndex, UUID, then documentKey in that order.
+ * version, txnOpIndex, UUID, then documentKey in that order.
* _typeBits: BinData - The keystring type bits used for deserialization.
* }
* The _data field data is encoded such that string comparisons provide the correct ordering of
diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp
index b52d53764dc..72894880953 100644
--- a/src/mongo/db/pipeline/resume_token_test.cpp
+++ b/src/mongo/db/pipeline/resume_token_test.cpp
@@ -257,20 +257,20 @@ TEST(ResumeToken, WrongVersionToken) {
ASSERT_THROWS(rtToken.getData(), AssertionException);
}
-TEST(ResumeToken, InvalidApplyOpsIndex) {
+TEST(ResumeToken, InvalidTxnOpIndex) {
Timestamp ts(1001, 3);
ResumeTokenData resumeTokenDataIn;
resumeTokenDataIn.clusterTime = ts;
- resumeTokenDataIn.applyOpsIndex = 1234;
+ resumeTokenDataIn.txnOpIndex = 1234;
- // Should round trip with a non-negative applyOpsIndex.
+ // Should round trip with a non-negative txnOpIndex.
auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
ResumeTokenData tokenData = rtToken.getData();
ASSERT_EQ(resumeTokenDataIn, tokenData);
- // Should fail with a negative applyOpsIndex.
- resumeTokenDataIn.applyOpsIndex = std::numeric_limits<size_t>::max();
+ // Should fail with a negative txnOpIndex.
+ resumeTokenDataIn.txnOpIndex = std::numeric_limits<size_t>::max();
rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
ASSERT_THROWS(rtToken.getData(), AssertionException);
@@ -278,7 +278,7 @@ TEST(ResumeToken, InvalidApplyOpsIndex) {
TEST(ResumeToken, StringEncodingSortsCorrectly) {
// Make sure that the string encoding of the resume tokens will compare in the correct order,
- // namely timestamp, version, applyOpsIndex, uuid, then documentKey.
+ // namely timestamp, version, txnOpIndex, uuid, then documentKey.
Timestamp ts2_2(2, 2);
Timestamp ts10_4(10, 4);
Timestamp ts10_5(10, 5);
@@ -310,8 +310,7 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) {
assertLt({ts2_2, 0, 0, boost::none, Value()}, {ts2_2, 1, 0, boost::none, Value()});
assertLt({ts10_4, 5, 0, boost::none, Value()}, {ts10_4, 10, 0, boost::none, Value()});
- // Test that the Timestamp is more important than the version, applyOpsIndex, UUID and
- // documentKey.
+ // Test that the Timestamp is more important than the version, txnOpIndex, UUID and documentKey.
assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 0}})},
{ts10_5, 0, 0, lower_uuid, Value(Document{{"_id", 0}})});
assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})},
@@ -331,14 +330,13 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) {
assertLt({ts10_4, 1, 0, lower_uuid, Value(Document{{"_id", 1}})},
{ts10_4, 2, 0, lower_uuid, Value(Document{{"_id", 0}})});
- // Test that when the Timestamp and version are the same, the applyOpsIndex breaks the tie.
+ // Test that when the Timestamp and version are the same, the txnOpIndex breaks the tie.
assertLt({ts10_4, 1, 6, lower_uuid, Value(Document{{"_id", 0}})},
{ts10_4, 1, 50, lower_uuid, Value(Document{{"_id", 0}})});
assertLt({ts2_2, 0, 0, higher_uuid, Value(Document{{"_id", 0}})},
{ts2_2, 0, 4, lower_uuid, Value(Document{{"_id", 0}})});
- // Test that when the Timestamp, version, and applyOpsIndex are the same, the UUID breaks the
- // tie.
+ // Test that when the Timestamp, version, and txnOpIndex are the same, the UUID breaks the tie.
assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})},
{ts2_2, 0, 0, higher_uuid, Value(Document{{"_id", 0}})});
assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 0}})},
@@ -350,7 +348,7 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) {
assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 1}})},
{ts10_4, 0, 0, higher_uuid, Value(Document{{"_id", 2}})});
- // Test that when the Timestamp, version, applyOpsIndex, and UUID are the same, the documentKey
+ // Test that when the Timestamp, version, txnOpIndex, and UUID are the same, the documentKey
// breaks the tie.
assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})},
{ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 1}})});
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 4647a8e2cb8..77f76eb5c9b 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -54,8 +54,8 @@ public:
MONGO_UNREACHABLE;
}
- repl::OplogEntry lookUpOplogEntryByOpTime(OperationContext* opCtx,
- repl::OpTime lookupTime) override {
+ std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
+ repl::OpTime time) const override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/repl/oplog_interface_mock.cpp b/src/mongo/db/repl/oplog_interface_mock.cpp
index 8b96460e1e2..6352fa7566a 100644
--- a/src/mongo/db/repl/oplog_interface_mock.cpp
+++ b/src/mongo/db/repl/oplog_interface_mock.cpp
@@ -97,6 +97,10 @@ public:
MONGO_UNREACHABLE;
}
+ repl::OpTime nextOpTime(OperationContext*) override {
+ MONGO_UNREACHABLE;
+ }
+
virtual ~TransactionHistoryIteratorMock() {}
bool hasNext() const override {
diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp
index 8d4e40cc87b..901fc41d05b 100644
--- a/src/mongo/db/transaction_history_iterator.cpp
+++ b/src/mongo/db/transaction_history_iterator.cpp
@@ -47,7 +47,10 @@ namespace {
/**
* Query the oplog for an entry with the given timestamp.
*/
-BSONObj findOneOplogEntry(OperationContext* opCtx, const repl::OpTime& opTime, bool permitYield) {
+BSONObj findOneOplogEntry(OperationContext* opCtx,
+ const repl::OpTime& opTime,
+ bool permitYield,
+ bool prevOpOnly = false) {
BSONObj oplogBSON;
invariant(!opTime.isNull());
@@ -55,6 +58,11 @@ BSONObj findOneOplogEntry(OperationContext* opCtx, const repl::OpTime& opTime, b
qr->setFilter(opTime.asQuery());
qr->setOplogReplay(true); // QueryOption_OplogReplay
+ if (prevOpOnly) {
+ qr->setProj(
+ BSON("_id" << 0 << repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName << 1LL));
+ }
+
const boost::intrusive_ptr<ExpressionContext> expCtx;
auto statusWithCQ = CanonicalQuery::canonicalize(opCtx,
@@ -106,15 +114,30 @@ repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) {
auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON));
const auto& oplogPrevTsOption = oplogEntry.getPrevWriteOpTimeInTransaction();
- uassert(
- ErrorCodes::FailedToParse,
- str::stream() << "Missing prevTs field on oplog entry of previous write in transaction: "
- << redact(oplogBSON),
- oplogPrevTsOption);
+ uassert(ErrorCodes::FailedToParse,
+ str::stream()
+ << "Missing prevOpTime field on oplog entry of previous write in transaction: "
+ << redact(oplogBSON),
+ oplogPrevTsOption);
_nextOpTime = oplogPrevTsOption.value();
return oplogEntry;
}
+repl::OpTime TransactionHistoryIterator::nextOpTime(OperationContext* opCtx) {
+ BSONObj oplogBSON = findOneOplogEntry(opCtx, _nextOpTime, _permitYield, true /* prevOpOnly */);
+
+ auto prevOpTime = oplogBSON[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName];
+ uassert(ErrorCodes::FailedToParse,
+ str::stream()
+ << "Missing prevOpTime field on oplog entry of previous write in transaction: "
+ << redact(oplogBSON),
+ !prevOpTime.eoo() && prevOpTime.isABSONObj());
+
+ auto returnOpTime = _nextOpTime;
+ _nextOpTime = repl::OpTime::parse(prevOpTime.Obj());
+ return returnOpTime;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/transaction_history_iterator.h b/src/mongo/db/transaction_history_iterator.h
index daca8370f89..f3583eebc44 100644
--- a/src/mongo/db/transaction_history_iterator.h
+++ b/src/mongo/db/transaction_history_iterator.h
@@ -36,8 +36,8 @@ namespace mongo {
class OperationContext;
/**
- * An iterator class that can traverse through the oplog entries that are linked via the prevTs
- * field.
+ * An iterator class that traverses backwards through a transaction's oplog entries by following the
+ * "prevOpTime" link in each entry.
*/
class TransactionHistoryIteratorBase {
public:
@@ -49,12 +49,17 @@ public:
virtual bool hasNext() const = 0;
/**
- * Returns the next oplog entry.
+ * Returns an oplog entry and advances the iterator one step back through the oplog.
* Should not be called if hasNext is false.
* Throws if next oplog entry is in a unrecognized format or if it can't find the next oplog
* entry.
*/
virtual repl::OplogEntry next(OperationContext* opCtx) = 0;
+
+ /**
+ * Same as next() but returns only the OpTime, instead of the entire entry.
+ */
+ virtual repl::OpTime nextOpTime(OperationContext* opCtx) = 0;
};
class TransactionHistoryIterator : public TransactionHistoryIteratorBase {
@@ -66,7 +71,8 @@ public:
virtual ~TransactionHistoryIterator() = default;
bool hasNext() const override;
- repl::OplogEntry next(OperationContext* opCtx);
+ repl::OplogEntry next(OperationContext* opCtx) override;
+ repl::OpTime nextOpTime(OperationContext* opCtx) override;
private:
// Clients can set this to allow PlanExecutors created by this TransactionHistoryIterator to