summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2019-02-25 11:17:08 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2019-03-14 13:50:46 -0400
commit3328515f7d80c8cedcaf8c0df83c6effc60330d0 (patch)
tree29aadcc611dc291b4e87ac5f14823667dbc96aff /src/mongo
parentd94cbf39c2b5d1e8b46444fb0604203f86851de4 (diff)
downloadmongo-3328515f7d80c8cedcaf8c0df83c6effc60330d0.tar.gz
SERVER-39434 Apply the new "commit" oplog entry for unprepared large transactions
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp49
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.h25
-rw-r--r--src/mongo/db/repl/oplog.cpp10
-rw-r--r--src/mongo/db/repl/oplog_entry.h9
-rw-r--r--src/mongo/db/repl/sync_tail.cpp81
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp405
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp55
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.h10
9 files changed, 617 insertions, 30 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index b24fddfcfce..09f8bb57aab 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -747,8 +747,11 @@ env.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/catalog_raii',
'$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/db/commands/mongod_fcv',
+ '$BUILD_DIR/mongo/db/logical_session_id_helpers',
+ '$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/util/clock_source_mock',
'idempotency_test_fixture',
'oplog_buffer_blocking_queue',
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 6d12069a194..9c7038b8d35 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -74,7 +74,8 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
OperationSessionInfo sessionInfo = {},
boost::optional<Date_t> wallClockTime = boost::none,
boost::optional<StmtId> stmtId = boost::none,
- boost::optional<UUID> uuid = boost::none) {
+ boost::optional<UUID> uuid = boost::none,
+ boost::optional<OpTime> prevOpTime = boost::none) {
return repl::OplogEntry(opTime, // optime
boost::none, // hash
opType, // opType
@@ -88,7 +89,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::none, // upsert
wallClockTime, // wall clock time
stmtId, // statement id
- boost::none, // optime of previous write within same transaction
+ prevOpTime, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none); // post-image optime
}
@@ -218,6 +219,31 @@ OplogEntry makeCommandOplogEntry(OpTime opTime,
}
/**
+ * Creates an oplog entry for 'command' with the given 'optime', 'namespace' and session information
+ */
+OplogEntry makeCommandOplogEntryWithSessionInfoAndStmtId(OpTime opTime,
+ const NamespaceString& nss,
+ const BSONObj& command,
+ LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ boost::optional<OpTime> prevOpTime) {
+ OperationSessionInfo info;
+ info.setSessionId(lsid);
+ info.setTxnNumber(txnNum);
+ return makeOplogEntry(opTime,
+ OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ command,
+ boost::none /* o2 */,
+ info /* sessionInfo */,
+ Date_t::min() /* wallClockTime -- required but not checked */,
+ stmtId,
+ boost::none /* uuid */,
+ prevOpTime);
+}
+
+/**
* Creates a create collection oplog entry with given optime.
*/
OplogEntry makeCreateCollectionOplogEntry(OpTime opTime,
@@ -307,12 +333,15 @@ OplogEntry makeInsertDocumentOplogEntryWithSessionInfo(OpTime opTime,
Date_t::now()); // wall clock time
}
-OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(OpTime opTime,
- const NamespaceString& nss,
- const BSONObj& documentToInsert,
- LogicalSessionId lsid,
- TxnNumber txnNum,
- StmtId stmtId) {
+OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ OpTime opTime,
+ const NamespaceString& nss,
+ boost::optional<UUID> uuid,
+ const BSONObj& documentToInsert,
+ LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ boost::optional<OpTime> prevOpTime) {
OperationSessionInfo info;
info.setSessionId(lsid);
info.setTxnNumber(txnNum);
@@ -323,7 +352,9 @@ OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(OpTime opTime,
boost::none, // o2
info, // session info
Date_t::now(), // wall clock time
- stmtId); // statement id
+ stmtId,
+ uuid,
+ prevOpTime); // previous optime in same session
}
diff --git a/src/mongo/db/repl/idempotency_test_fixture.h b/src/mongo/db/repl/idempotency_test_fixture.h
index 2823f31cb41..fa0604d89dc 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.h
+++ b/src/mongo/db/repl/idempotency_test_fixture.h
@@ -156,17 +156,28 @@ OplogEntry makeCommandOplogEntry(OpTime opTime,
const BSONObj& command,
boost::optional<UUID> uuid = boost::none);
+OplogEntry makeCommandOplogEntryWithSessionInfoAndStmtId(
+ OpTime opTime,
+ const NamespaceString& nss,
+ const BSONObj& command,
+ LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ boost::optional<OpTime> prevOpTime = boost::none);
+
OplogEntry makeInsertDocumentOplogEntryWithSessionInfo(OpTime opTime,
const NamespaceString& nss,
const BSONObj& documentToInsert,
OperationSessionInfo info);
-OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(OpTime opTime,
- const NamespaceString& nss,
- const BSONObj& documentToInsert,
- LogicalSessionId lsid,
- TxnNumber txnNum,
- StmtId stmtId);
-
+OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ OpTime opTime,
+ const NamespaceString& nss,
+ boost::optional<UUID> uuid,
+ const BSONObj& documentToInsert,
+ LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ boost::optional<OpTime> prevOpTime = boost::none);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index bd5485e9ae2..59781d18f47 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1382,8 +1382,8 @@ Status applyOperation_inlock(OperationContext* opCtx,
mode == repl::OplogApplication::Mode::kApplyOpsCmd || opCtx->writesAreReplicated();
OpCounters* opCounters = shouldUseGlobalOpCounters ? &globalOpCounters : &replOpCounters;
- std::array<StringData, 8> names = {"ts", "t", "o", "ui", "ns", "op", "b", "o2"};
- std::array<BSONElement, 8> fields;
+ std::array<StringData, 9> names = {"ts", "t", "o", "ui", "ns", "op", "b", "o2", "inTxn"};
+ std::array<BSONElement, 9> fields;
op.getFields(names, &fields);
BSONElement& fieldTs = fields[0];
BSONElement& fieldT = fields[1];
@@ -1393,11 +1393,17 @@ Status applyOperation_inlock(OperationContext* opCtx,
BSONElement& fieldOp = fields[5];
BSONElement& fieldB = fields[6];
BSONElement& fieldO2 = fields[7];
+ BSONElement& fieldInTxn = fields[8];
BSONObj o;
if (fieldO.isABSONObj())
o = fieldO.embeddedObject();
+ // Make sure we don't apply partial transactions through applyOps.
+ uassert(51117,
+ "Operations with 'inTxn' set are only used internally by secondaries.",
+ fieldInTxn.eoo());
+
// operation type -- see logOp() comments for types
const char* opType = fieldOp.valuestrsafe();
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 123ed06087c..46a6fe59e1c 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -116,6 +116,15 @@ public:
bool isCommand() const;
/**
+ * Returns if the oplog entry is part of a transaction that has not yet been prepared or
+ * committed. The actual "prepare" or "commit" oplog entries do not have an inTxn field
+ * and so this method will always return false for them.
+ */
+ bool isInPendingTransaction() const {
+ return getInTxn() && *getInTxn();
+ }
+
+ /**
* Returns if the oplog entry is for a CRUD operation.
*/
static bool isCrudOpType(OpTypeEnum opType);
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index bc2bfb4c37d..069e56afba4 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/replication_state_transition_lock_guard.h"
@@ -68,6 +69,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/stats/timer_stats.h"
@@ -802,6 +804,21 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer,
}
}
+// Returns whether an oplog entry represents a commitTransaction for a transaction which has not
+// been prepared. An entry is an unprepared commit if it has a boolean "prepared" field set to
+// false.
+inline bool isUnpreparedCommit(const OplogEntry& entry) {
+ return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction &&
+ entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName].isBoolean() &&
+ !entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName].boolean();
+}
+
+// Returns whether an oplog entry represents an applyOps which is a self-contained atomic operation,
+// as opposed to part of a prepared transaction.
+inline bool isUnpreparedApplyOps(const OplogEntry& entry) {
+ return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare();
+}
+
// Copies ops out of the bgsync queue into the deque passed in as a parameter.
// Returns true if the batch should be ended early.
// Batch should end early if we encounter a command, or if
@@ -870,9 +887,10 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
return true;
}
- // Commands must be processed one at a time. The only exception to this is applyOps because
- // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is safe
- // to batch applyOps commands with CRUD operations when reading from the oplog buffer.
+ // Commands must be processed one at a time. The exceptions to this are unprepared applyOps,
+ // because applyOps oplog entries are effectively containers for CRUD operations, and unprepared
+ // commitTransaction, because that also expands to CRUD operations. Therefore, it is safe to
+ // batch applyOps commands with CRUD operations when reading from the oplog buffer.
//
// Oplog entries on 'system.views' should also be processed one at a time. View catalog
// immediately reflects changes for each oplog entry so we can see inconsistent view catalog if
@@ -880,8 +898,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
//
// Process updates to 'admin.system.version' individually as well so the secondary's FCV when
// processing each operation matches the primary's when committing that operation.
- if ((entry.isCommand() &&
- (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare())) ||
+ if ((entry.isCommand() && (!isUnpreparedCommit(entry) && !isUnpreparedApplyOps(entry))) ||
entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) {
if (ops->getCount() == 1) {
// apply commands one-at-a-time
@@ -1170,7 +1187,8 @@ Status multiSyncApply(OperationContext* opCtx,
* vector in any other way.
* writerVectors - Set of operations for each worker thread to apply.
* derivedOps - If provided, this function inserts a decomposition of applyOps operations
- * and instructions for updating the transactions table.
+ * and instructions for updating the transactions table. Required if processing oplogs
+ * with transactions.
* sessionUpdateTracker - if provided, keeps track of session info from ops.
*/
void SyncTail::_fillWriterVectors(OperationContext* opCtx,
@@ -1185,6 +1203,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
const uint32_t numWriters = writerVectors->size();
CachedCollectionProperties collPropertiesCache;
+ LogicalSessionIdMap<std::vector<OplogEntry*>> pendingTxnOps;
for (auto&& op : *ops) {
// If the operation's optime is before or the same as the beginApplyingOpTime we don't want
@@ -1208,6 +1227,20 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
}
}
+ // If this entry is part of a multi-oplog-entry transaction, ignore it until the commit.
+ // We must save it here because we are not guaranteed it has been written to the oplog
+ // yet.
+ if (op.isInPendingTransaction()) {
+ auto& pendingList = pendingTxnOps[*op.getSessionId()];
+ if (!pendingList.empty() && pendingList.front()->getTxnNumber() != op.getTxnNumber()) {
+ // TODO: When abortTransaction is implemented, this should invariant and
+ // the list should be cleared on abort.
+ pendingList.clear();
+ }
+ pendingList.push_back(&op);
+ continue;
+ }
+
if (op.isCrudOpType()) {
auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs);
@@ -1233,7 +1266,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// Extract applyOps operations and fill writers with extracted operations using this
// function.
- if (op.getCommandType() == OplogEntry::CommandType::kApplyOps && !op.shouldPrepare()) {
+ if (isUnpreparedApplyOps(op)) {
try {
derivedOps->emplace_back(ApplyOps::extractOperations(op));
@@ -1247,6 +1280,40 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
<< redact(op.toBSON())));
}
continue;
+ } else if (isUnpreparedCommit(op)) {
+ // On commit of unprepared transactions, get transactional operations from the oplog and
+ // fill writers with those operations.
+ try {
+ invariant(derivedOps);
+ auto& pendingList = pendingTxnOps[*op.getSessionId()];
+ {
+ // We need to create an alternate opCtx to avoid the reads of the transaction
+ // messing up the state of the main opCtx. In particular we do not want to
+ // set the ReadSource to kLastApplied for the main opCtx.
+ // TODO(SERVER-40053): This should be no longer necessary after
+ // SERVER-40053 makes the transaction history iterator
+ // avoid changing the read source.
+ auto newClient =
+ opCtx->getServiceContext()->makeClient("read-pending-transactions");
+ AlternativeClientRegion acr(newClient);
+ auto newOpCtx = cc().makeOperationContext();
+ ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
+ newOpCtx->lockState());
+ derivedOps->emplace_back(
+ readTransactionOperationsFromOplogChain(newOpCtx.get(), op, pendingList));
+ pendingList.clear();
+ }
+ // Transaction entries cannot have different session updates.
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ } catch (...) {
+ fassertFailedWithStatusNoTrace(
+ 51116,
+ exceptionToStatus().withContext(str::stream()
+ << "Unable to read operations for transaction "
+ << "commit "
+ << redact(op.toBSON())));
+ }
+ continue;
}
auto& writer = (*writerVectors)[hash % numWriters];
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index f080f656c4d..5cd3d140d34 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/bgsync.h"
@@ -65,6 +66,8 @@
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/stats/counters.h"
+#include "mongo/db/transaction_participant_gen.h"
#include "mongo/stdx/mutex.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
@@ -470,6 +473,389 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) {
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
}
+class MultiOplogEntrySyncTailTest : public SyncTailTest {
+ void setUp() override {
+ SyncTailTest::setUp();
+ gUseMultipleOplogEntryFormatForTransactions = true;
+ }
+ void tearDown() override {
+ gUseMultipleOplogEntryFormatForTransactions = false;
+ SyncTailTest::tearDown();
+ }
+};
+
+TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
+ NamespaceString nss1("test.pendingtxn1");
+ NamespaceString nss2("test.pendingtxn2");
+
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1);
+ auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2);
+
+ auto lsid = makeLogicalSessionId(_opCtx.get());
+ TxnNumber txnNum(1);
+
+ auto insertOp1 =
+ makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 1), 1LL},
+ nss1,
+ uuid1,
+ BSON("_id" << 1),
+ lsid,
+ txnNum,
+ StmtId(0),
+ OpTime());
+ insertOp1 = uassertStatusOK(
+ OplogEntry::parse(insertOp1.toBSON().addField(BSON("inTxn" << true).firstElement())));
+ auto insertOp2 =
+ makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 2), 1LL},
+ nss2,
+ uuid2,
+ BSON("_id" << 2),
+ lsid,
+ txnNum,
+ StmtId(1),
+ insertOp1.getOpTime());
+ insertOp2 = uassertStatusOK(
+ OplogEntry::parse(insertOp2.toBSON().addField(BSON("inTxn" << true).firstElement())));
+ auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 3), 1LL},
+ nss1,
+ BSON("commitTransaction" << 1 << "prepared" << false),
+ lsid,
+ txnNum,
+ StmtId(2),
+ insertOp2.getOpTime());
+ // This re-parse puts the commit op into a normalized form for comparison.
+ commitOp = uassertStatusOK(OplogEntry::parse(commitOp.toBSON()));
+
+ // Use separate vectors for each namespace as the opObserver may be called from multiple
+ // threads at once.
+ std::vector<BSONObj> insertedOplogEntries, insertedDocs1, insertedDocs2;
+ _opObserver->onInsertsFn =
+ [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
+ if (nss.isOplog())
+ insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end());
+ else if (nss == nss1) {
+ insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end());
+ } else if (nss == nss2) {
+ insertedDocs2.insert(insertedDocs2.end(), docs.begin(), docs.end());
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) {
+ // Not testing session updates for now.
+ } else
+ FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front();
+ };
+
+ auto writerPool = OplogApplier::makeWriterPool();
+ SyncTail syncTail(
+ nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
+
+ // Apply a batch with only the first operation. This should result in the first oplog entry
+ // being put in the oplog, but with no effect because the operation is part of a pending
+ // transaction.
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp1}));
+ ASSERT_EQ(1U, insertedOplogEntries.size());
+ ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp1.toBSON());
+ ASSERT_TRUE(insertedDocs1.empty());
+ ASSERT_TRUE(insertedDocs2.empty());
+
+ // Apply a batch with only the second operation. This should result in the second oplog entry
+ // being put in the oplog, but with no effect because the operation is part of a pending
+ // transaction.
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp2}));
+ ASSERT_EQ(2U, insertedOplogEntries.size());
+ ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp2.toBSON());
+ ASSERT_TRUE(insertedDocs1.empty());
+ ASSERT_TRUE(insertedDocs2.empty());
+
+ // Apply a batch with only the commit. This should result in the commit being put in the
+ // oplog, and the two previous entries being applied.
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {commitOp}));
+ ASSERT_EQ(3U, insertedOplogEntries.size());
+ ASSERT_EQ(1U, insertedDocs1.size());
+ ASSERT_EQ(1U, insertedDocs2.size());
+ ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), commitOp.toBSON());
+}
+
+TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) {
+ NamespaceString nss1("test.pendingtxn1");
+ NamespaceString nss2("test.pendingtxn2");
+
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1);
+ auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2);
+
+ auto lsid = makeLogicalSessionId(_opCtx.get());
+ TxnNumber txnNum(1);
+
+ auto insertOp1 =
+ makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 1), 1LL},
+ nss1,
+ uuid1,
+ BSON("_id" << 1),
+ lsid,
+ txnNum,
+ StmtId(0),
+ OpTime());
+ insertOp1 = uassertStatusOK(
+ OplogEntry::parse(insertOp1.toBSON().addField(BSON("inTxn" << true).firstElement())));
+ auto insertOp2 =
+ makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 2), 1LL},
+ nss2,
+ uuid2,
+ BSON("_id" << 2),
+ lsid,
+ txnNum,
+ StmtId(1),
+ insertOp1.getOpTime());
+ insertOp2 = uassertStatusOK(
+ OplogEntry::parse(insertOp2.toBSON().addField(BSON("inTxn" << true).firstElement())));
+ auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 3), 1LL},
+ nss1,
+ BSON("commitTransaction" << 1 << "prepared" << false),
+ lsid,
+ txnNum,
+ StmtId(2),
+ insertOp2.getOpTime());
+ // This re-parse puts the commit op into a normalized form.
+ commitOp = uassertStatusOK(OplogEntry::parse(commitOp.toBSON()));
+
+ // Use separate vectors for each namespace as the opObserver may be called from multiple
+ // threads at once.
+ std::vector<BSONObj> insertedOplogEntries, insertedDocs1, insertedDocs2;
+ _opObserver->onInsertsFn =
+ [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
+ if (nss.isOplog())
+ insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end());
+ else if (nss == nss1) {
+ insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end());
+ } else if (nss == nss2) {
+ insertedDocs2.insert(insertedDocs2.end(), docs.begin(), docs.end());
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) {
+ // Not testing session updates for now.
+ } else
+ FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front();
+ };
+
+ auto writerPool = OplogApplier::makeWriterPool();
+ // Skipping writes to oplog proves we're testing the code path which does not rely on reading
+ // the oplog.
+ OplogApplier::Options applierOpts;
+ applierOpts.skipWritesToOplog = true;
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ applierOpts);
+
+ // Apply both inserts and the commit in a single batch. We expect no oplog entries to
+ // be inserted (because we've set skipWritesToOplog), and both entries to be committed.
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp1, insertOp2, commitOp}));
+ ASSERT_EQ(0U, insertedOplogEntries.size());
+ ASSERT_EQ(1U, insertedDocs1.size());
+ ASSERT_EQ(1U, insertedDocs2.size());
+}
+
+TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
+ // Tests an unprepared transaction with ops both in the batch with the commit and prior
+ // batches.
+ NamespaceString nss1("test.pendingtxn1");
+ NamespaceString nss2("test.pendingtxn2");
+
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1);
+ auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2);
+
+ auto lsid = makeLogicalSessionId(_opCtx.get());
+ TxnNumber txnNum(1);
+
+ // Populate transaction with 4 linked inserts, one in nss2 and the others in nss1.
+ std::vector<OplogEntry> insertOps;
+ for (int i = 0; i < 4; i++) {
+ insertOps.push_back(makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), i + 1), 1LL},
+ i == 1 ? nss2 : nss1,
+ i == 1 ? uuid2 : uuid1,
+ BSON("_id" << i),
+ lsid,
+ txnNum,
+ StmtId(i),
+ i == 0 ? OpTime() : insertOps.back().getOpTime()));
+ insertOps.back() = uassertStatusOK(OplogEntry::parse(
+ insertOps.back().toBSON().addField(BSON("inTxn" << true).firstElement())));
+ }
+ auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 5), 1LL},
+ nss1,
+ BSON("commitTransaction" << 1 << "prepared" << false),
+ lsid,
+ txnNum,
+ StmtId(4),
+ insertOps.back().getOpTime());
+ // This re-parse puts the commit op into a normalized form.
+ commitOp = uassertStatusOK(OplogEntry::parse(commitOp.toBSON()));
+
+ // Use separate vectors for each namespace as the opObserver may be called from multiple
+ // threads at once.
+ std::vector<BSONObj> insertedOplogEntries, insertedDocs1, insertedDocs2;
+ _opObserver->onInsertsFn =
+ [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
+ if (nss.isOplog())
+ insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end());
+ else if (nss == nss1) {
+ insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end());
+ } else if (nss == nss2) {
+ insertedDocs2.insert(insertedDocs2.end(), docs.begin(), docs.end());
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) {
+ // Not testing session updates for now.
+ } else
+ FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front();
+ };
+
+ auto writerPool = OplogApplier::makeWriterPool();
+ SyncTail syncTail(
+ nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
+
+ // Insert the first entry in its own batch. This should result in the oplog entry being written
+ // but the entry should not be applied as it is part of a pending transaction.
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}));
+ ASSERT_EQ(1U, insertedOplogEntries.size());
+ ASSERT_EQ(0U, insertedDocs1.size());
+ ASSERT_EQ(0U, insertedDocs2.size());
+
+ // Insert the rest of the entries, including the commit. These entries should be added to the
+ // oplog, and all the entries including the first should be applied.
+ ASSERT_OK(
+ syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}));
+ ASSERT_EQ(5U, insertedOplogEntries.size());
+ ASSERT_EQ(3U, insertedDocs1.size());
+ ASSERT_EQ(1U, insertedDocs2.size());
+
+ // Check docs and ordering of docs in nss1.
+ // The insert into nss2 is unordered with respect to those.
+ ASSERT_BSONOBJ_EQ(insertOps[0].getObject(), insertedDocs1[0]);
+ ASSERT_BSONOBJ_EQ(insertOps[1].getObject(), insertedDocs2.front());
+ ASSERT_BSONOBJ_EQ(insertOps[2].getObject(), insertedDocs1[1]);
+ ASSERT_BSONOBJ_EQ(insertOps[3].getObject(), insertedDocs1[2]);
+}
+
+TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) {
+ // Tests that two transactions on the same session ID in the same batch both
+ // apply correctly.
+ NamespaceString nss1("test.pendingtxn1");
+
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1);
+
+ auto lsid = makeLogicalSessionId(_opCtx.get());
+ TxnNumber txnNum1(1);
+ TxnNumber txnNum2(2);
+
+ std::vector<OplogEntry> insertOps1, insertOps2;
+ insertOps1.push_back(
+ makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 1), 1LL},
+ nss1,
+ uuid1,
+ BSON("_id" << 1),
+ lsid,
+ txnNum1,
+ StmtId(0),
+ OpTime()));
+ insertOps1.back() = uassertStatusOK(OplogEntry::parse(
+ insertOps1.back().toBSON().addField(BSON("inTxn" << true).firstElement())));
+ insertOps1.push_back(
+ makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 2), 1LL},
+ nss1,
+ uuid1,
+ BSON("_id" << 2),
+ lsid,
+ txnNum1,
+ StmtId(1),
+ insertOps1.back().getOpTime()));
+ insertOps1.back() = uassertStatusOK(OplogEntry::parse(
+ insertOps1.back().toBSON().addField(BSON("inTxn" << true).firstElement())));
+ insertOps2.push_back(
+ makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 1), 1LL},
+ nss1,
+ uuid1,
+ BSON("_id" << 3),
+ lsid,
+ txnNum2,
+ StmtId(0),
+ OpTime()));
+ insertOps2.back() = uassertStatusOK(OplogEntry::parse(
+ insertOps2.back().toBSON().addField(BSON("inTxn" << true).firstElement())));
+ insertOps2.push_back(
+ makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 2), 1LL},
+ nss1,
+ uuid1,
+ BSON("_id" << 4),
+ lsid,
+ txnNum2,
+ StmtId(1),
+ insertOps2.back().getOpTime()));
+ insertOps2.back() = uassertStatusOK(OplogEntry::parse(
+ insertOps2.back().toBSON().addField(BSON("inTxn" << true).firstElement())));
+ auto commitOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 3), 1LL},
+ nss1,
+ BSON("commitTransaction" << 1 << "prepared" << false),
+ lsid,
+ txnNum1,
+ StmtId(2),
+ insertOps1.back().getOpTime());
+ auto commitOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(2), 3), 1LL},
+ nss1,
+ BSON("commitTransaction" << 1 << "prepared" << false),
+ lsid,
+ txnNum2,
+ StmtId(2),
+ insertOps2.back().getOpTime());
+ // This re-parse puts the commit ops into a normalized form.
+ commitOp1 = uassertStatusOK(OplogEntry::parse(commitOp1.toBSON()));
+ commitOp2 = uassertStatusOK(OplogEntry::parse(commitOp2.toBSON()));
+
+ // Use separate vectors for each namespace as the opObserver may be called from multiple
+ // threads at once.
+ std::vector<BSONObj> insertedOplogEntries, insertedDocs1;
+ _opObserver->onInsertsFn =
+ [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
+ if (nss.isOplog())
+ insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end());
+ else if (nss == nss1) {
+ insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end());
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) {
+ // Not testing session updates for now.
+ } else
+ FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front();
+ };
+
+ auto writerPool = OplogApplier::makeWriterPool();
+ SyncTail syncTail(
+ nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
+
+ // Note the insert counter so we can check it later. It is necessary to use opCounters as
+ // inserts are idempotent so we will not detect duplicate inserts just by checking inserts in
+ // the opObserver.
+ int insertsBefore = replOpCounters.getInsert()->load();
+ // Insert all the oplog entries in one batch. All inserts should be executed, in order, exactly
+ // once.
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(),
+ {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2}));
+ ASSERT_EQ(6U, insertedOplogEntries.size());
+ ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore);
+ ASSERT_EQ(4U, insertedDocs1.size());
+
+ // Check docs and ordering of docs in nss1.
+ ASSERT_BSONOBJ_EQ(insertOps1[0].getObject(), insertedDocs1[0]);
+ ASSERT_BSONOBJ_EQ(insertOps1[1].getObject(), insertedDocs1[1]);
+ ASSERT_BSONOBJ_EQ(insertOps2[0].getObject(), insertedDocs1[2]);
+ ASSERT_BSONOBJ_EQ(insertOps2[1].getObject(), insertedDocs1[3]);
+}
+
void testWorkerMultikeyPaths(OperationContext* opCtx,
const OplogEntry& op,
unsigned long numPaths) {
@@ -1804,25 +2190,34 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
ASSERT(client.runCommand(ns1.db().toString(), BSON("create" << ns1.coll()), result));
ASSERT(client.runCommand(ns2.db().toString(), BSON("create" << ns2.coll()), result));
ASSERT(client.runCommand(ns3.db().toString(), BSON("create" << ns3.coll()), result));
+ auto uuid0 = [&] {
+ return AutoGetCollectionForRead(_opCtx.get(), ns0).getCollection()->uuid();
+ }();
+ auto uuid1 = [&] {
+ return AutoGetCollectionForRead(_opCtx.get(), ns1).getCollection()->uuid();
+ }();
+ auto uuid2 = [&] {
+ return AutoGetCollectionForRead(_opCtx.get(), ns2).getCollection()->uuid();
+ }();
// Entries with a session id and a txnNumber update the transaction table.
auto lsidSingle = makeLogicalSessionIdForTest();
auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 0), 1LL}, ns0, BSON("_id" << 0), lsidSingle, 5LL, 0);
+ {Timestamp(Seconds(1), 0), 1LL}, ns0, uuid0, BSON("_id" << 0), lsidSingle, 5LL, 0);
// For entries with the same session, the entry with a larger txnNumber is saved.
auto lsidDiffTxn = makeLogicalSessionIdForTest();
auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(2), 0), 1LL}, ns1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1);
+ {Timestamp(Seconds(2), 0), 1LL}, ns1, uuid1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1);
auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(3), 0), 1LL}, ns1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1);
+ {Timestamp(Seconds(3), 0), 1LL}, ns1, uuid1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1);
// For entries with the same session and txnNumber, the later optime is saved.
auto lsidSameTxn = makeLogicalSessionIdForTest();
auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(6), 0), 1LL}, ns2, BSON("_id" << 0), lsidSameTxn, 30LL, 0);
+ {Timestamp(Seconds(6), 0), 1LL}, ns2, uuid2, BSON("_id" << 0), lsidSameTxn, 30LL, 0);
auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(5), 0), 1LL}, ns2, BSON("_id" << 1), lsidSameTxn, 30LL, 1);
+ {Timestamp(Seconds(5), 0), 1LL}, ns2, uuid2, BSON("_id" << 1), lsidSameTxn, 30LL, 1);
// Entries with a session id but no txnNumber do not lead to updates.
auto lsidNoTxn = makeLogicalSessionIdForTest();
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index d4921e5864f..d5076312631 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -79,6 +79,9 @@ Status applyCommitTransaction(OperationContext* opCtx,
IDLParserErrorContext ctx("commitTransaction");
auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject());
+ const bool prepared = !commitCommand.getPrepared() || *commitCommand.getPrepared();
+ if (!prepared)
+ return Status::OK();
if (mode == repl::OplogApplication::Mode::kRecovering ||
mode == repl::OplogApplication::Mode::kInitialSync) {
@@ -139,4 +142,56 @@ Status applyAbortTransaction(OperationContext* opCtx,
return Status::OK();
}
+repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
+ OperationContext* opCtx,
+ const repl::OplogEntry& commitOrPrepare,
+ const std::vector<repl::OplogEntry*> cachedOps) {
+ repl::MultiApplier::Operations ops;
+
+ // Get the previous oplog entry.
+ auto currentOpTime = commitOrPrepare.getOpTime();
+
+ // The cachedOps are the ops for this transaction that are from the same oplog application batch
+ // as the commit or prepare, those which have not necessarily been written to the oplog. These
+ // ops are in order of increasing timestamp.
+
+ // The lastEntryOpTime is the OpTime of the last (latest OpTime) entry for this transaction
+ // which is expected to be present in the oplog. It is the entry before the first cachedOp,
+ // unless there are no cachedOps in which case it is the entry before the commit or prepare.
+ const auto lastEntryOpTime = (cachedOps.empty() ? commitOrPrepare : *cachedOps.front())
+ .getPrevWriteOpTimeInTransaction();
+ invariant(lastEntryOpTime < currentOpTime);
+
+ TransactionHistoryIterator iter(lastEntryOpTime.get());
+ // Empty commits are not allowed, but empty prepares are.
+ invariant(commitOrPrepare.getCommandType() !=
+ repl::OplogEntry::CommandType::kCommitTransaction ||
+ !cachedOps.empty() || iter.hasNext());
+ auto commitOrPrepareObj = commitOrPrepare.toBSON();
+
+ // First retrieve and transform the ops from the oplog, which will be retrieved in reverse
+ // order.
+ while (iter.hasNext()) {
+ const auto& operationEntry = iter.next(opCtx);
+ invariant(operationEntry.isInPendingTransaction());
+ // Now reconstruct the entry "as if" it were at the commit or prepare time.
+ BSONObjBuilder builder(operationEntry.getReplOperation().toBSON());
+ builder.appendElementsUnique(commitOrPrepareObj);
+ ops.emplace_back(builder.obj());
+ }
+ std::reverse(ops.begin(), ops.end());
+
+ // Next retrieve and transform the ops from the current batch, which are in increasing timestamp
+ // order.
+ for (auto* cachedOp : cachedOps) {
+ const auto& operationEntry = *cachedOp;
+ invariant(operationEntry.isInPendingTransaction());
+ // Now reconstruct the entry "as if" it were at the commit or prepare time.
+ BSONObjBuilder builder(operationEntry.getReplOperation().toBSON());
+ builder.appendElementsUnique(commitOrPrepareObj);
+ ops.emplace_back(builder.obj());
+ }
+ return ops;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h
index ee2c77b722d..27de289fe89 100644
--- a/src/mongo/db/repl/transaction_oplog_application.h
+++ b/src/mongo/db/repl/transaction_oplog_application.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
@@ -49,4 +50,13 @@ Status applyAbortTransaction(OperationContext* opCtx,
const repl::OplogEntry& entry,
repl::OplogApplication::Mode mode);
+/**
+ * Follow an oplog chain and copy the operations to destination. Operations will be copied in
+ * forward oplog order (increasing optimes).
+ */
+repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
+ OperationContext* opCtx,
+ const repl::OplogEntry& entry,
+ const std::vector<repl::OplogEntry*> cachedOps);
+
} // namespace mongo