diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2019-02-25 11:17:08 -0500 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2019-03-14 13:50:46 -0400 |
commit | 3328515f7d80c8cedcaf8c0df83c6effc60330d0 (patch) | |
tree | 29aadcc611dc291b4e87ac5f14823667dbc96aff /src/mongo | |
parent | d94cbf39c2b5d1e8b46444fb0604203f86851de4 (diff) | |
download | mongo-3328515f7d80c8cedcaf8c0df83c6effc60330d0.tar.gz |
SERVER-39434 Apply the new "commit" oplog entry for unprepared large transactions
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.h | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 81 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 405 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.h | 10 |
9 files changed, 617 insertions, 30 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index b24fddfcfce..09f8bb57aab 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -747,8 +747,11 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/catalog_raii', '$BUILD_DIR/mongo/db/dbdirectclient', '$BUILD_DIR/mongo/db/commands/mongod_fcv', + '$BUILD_DIR/mongo/db/logical_session_id_helpers', + '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/util/clock_source_mock', 'idempotency_test_fixture', 'oplog_buffer_blocking_queue', diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 6d12069a194..9c7038b8d35 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -74,7 +74,8 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, OperationSessionInfo sessionInfo = {}, boost::optional<Date_t> wallClockTime = boost::none, boost::optional<StmtId> stmtId = boost::none, - boost::optional<UUID> uuid = boost::none) { + boost::optional<UUID> uuid = boost::none, + boost::optional<OpTime> prevOpTime = boost::none) { return repl::OplogEntry(opTime, // optime boost::none, // hash opType, // opType @@ -88,7 +89,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::none, // upsert wallClockTime, // wall clock time stmtId, // statement id - boost::none, // optime of previous write within same transaction + prevOpTime, // optime of previous write within same transaction boost::none, // pre-image optime boost::none); // post-image optime } @@ -218,6 +219,31 @@ OplogEntry makeCommandOplogEntry(OpTime opTime, } /** + * Creates an oplog entry for 'command' with the given 'optime', 'namespace' and session information + */ +OplogEntry makeCommandOplogEntryWithSessionInfoAndStmtId(OpTime opTime, + const NamespaceString& nss, + const BSONObj& command, + LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + boost::optional<OpTime> prevOpTime) { + OperationSessionInfo info; + info.setSessionId(lsid); + info.setTxnNumber(txnNum); + return makeOplogEntry(opTime, + OpTypeEnum::kCommand, + nss.getCommandNS(), + command, + boost::none /* o2 */, + info /* sessionInfo */, + Date_t::min() /* wallClockTime -- required but not checked */, + stmtId, + boost::none /* uuid */, + prevOpTime); +} + +/** * Creates a create collection oplog entry with given optime. */ OplogEntry makeCreateCollectionOplogEntry(OpTime opTime, @@ -307,12 +333,15 @@ OplogEntry makeInsertDocumentOplogEntryWithSessionInfo(OpTime opTime, Date_t::now()); // wall clock time } -OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(OpTime opTime, - const NamespaceString& nss, - const BSONObj& documentToInsert, - LogicalSessionId lsid, - TxnNumber txnNum, - StmtId stmtId) { +OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( + OpTime opTime, + const NamespaceString& nss, + boost::optional<UUID> uuid, + const BSONObj& documentToInsert, + LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + boost::optional<OpTime> prevOpTime) { OperationSessionInfo info; info.setSessionId(lsid); info.setTxnNumber(txnNum); @@ -323,7 +352,9 @@ OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(OpTime opTime, boost::none, // o2 info, // session info Date_t::now(), // wall clock time - stmtId); // statement id + stmtId, + uuid, + prevOpTime); // previous optime in same session } diff --git a/src/mongo/db/repl/idempotency_test_fixture.h b/src/mongo/db/repl/idempotency_test_fixture.h index 2823f31cb41..fa0604d89dc 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.h +++ b/src/mongo/db/repl/idempotency_test_fixture.h @@ -156,17 +156,28 @@ OplogEntry makeCommandOplogEntry(OpTime opTime, const BSONObj& command, boost::optional<UUID> uuid = boost::none); +OplogEntry makeCommandOplogEntryWithSessionInfoAndStmtId( + OpTime opTime, + const NamespaceString& nss, + const BSONObj& command, + LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + boost::optional<OpTime> prevOpTime = boost::none); + OplogEntry makeInsertDocumentOplogEntryWithSessionInfo(OpTime opTime, const NamespaceString& nss, const BSONObj& documentToInsert, OperationSessionInfo info); -OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(OpTime opTime, - const NamespaceString& nss, - const BSONObj& documentToInsert, - LogicalSessionId lsid, - TxnNumber txnNum, - StmtId stmtId); - +OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( + OpTime opTime, + const NamespaceString& nss, + boost::optional<UUID> uuid, + const BSONObj& documentToInsert, + LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + boost::optional<OpTime> prevOpTime = boost::none); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index bd5485e9ae2..59781d18f47 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1382,8 +1382,8 @@ Status applyOperation_inlock(OperationContext* opCtx, mode == repl::OplogApplication::Mode::kApplyOpsCmd || opCtx->writesAreReplicated(); OpCounters* opCounters = shouldUseGlobalOpCounters ? &globalOpCounters : &replOpCounters; - std::array<StringData, 8> names = {"ts", "t", "o", "ui", "ns", "op", "b", "o2"}; - std::array<BSONElement, 8> fields; + std::array<StringData, 9> names = {"ts", "t", "o", "ui", "ns", "op", "b", "o2", "inTxn"}; + std::array<BSONElement, 9> fields; op.getFields(names, &fields); BSONElement& fieldTs = fields[0]; BSONElement& fieldT = fields[1]; @@ -1393,11 +1393,17 @@ Status applyOperation_inlock(OperationContext* opCtx, BSONElement& fieldOp = fields[5]; BSONElement& fieldB = fields[6]; BSONElement& fieldO2 = fields[7]; + BSONElement& fieldInTxn = fields[8]; BSONObj o; if (fieldO.isABSONObj()) o = fieldO.embeddedObject(); + // Make sure we don't apply partial transactions through applyOps. + uassert(51117, + "Operations with 'inTxn' set are only used internally by secondaries.", + fieldInTxn.eoo()); + // operation type -- see logOp() comments for types const char* opType = fieldOp.valuestrsafe(); diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 123ed06087c..46a6fe59e1c 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -116,6 +116,15 @@ public: bool isCommand() const; /** + * Returns if the oplog entry is part of a transaction that has not yet been prepared or + * committed. The actual "prepare" or "commit" oplog entries do not have an inTxn field + * and so this method will always return false for them. + */ + bool isInPendingTransaction() const { + return getInTxn() && *getInTxn(); + } + + /** * Returns if the oplog entry is for a CRUD operation. */ static bool isCrudOpType(OpTypeEnum opType); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index bc2bfb4c37d..069e56afba4 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -49,6 +49,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/replication_state_transition_lock_guard.h" @@ -68,6 +69,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/stats/timer_stats.h" @@ -802,6 +804,21 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer, } } +// Returns whether an oplog entry represents a commitTransaction for a transaction which has not +// been prepared. An entry is an unprepared commit if it has a boolean "prepared" field set to +// false. +inline bool isUnpreparedCommit(const OplogEntry& entry) { + return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction && + entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName].isBoolean() && + !entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName].boolean(); +} + +// Returns whether an oplog entry represents an applyOps which is a self-contained atomic operation, +// as opposed to part of a prepared transaction. +inline bool isUnpreparedApplyOps(const OplogEntry& entry) { + return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare(); +} + // Copies ops out of the bgsync queue into the deque passed in as a parameter. // Returns true if the batch should be ended early. // Batch should end early if we encounter a command, or if @@ -870,9 +887,10 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, return true; } - // Commands must be processed one at a time. The only exception to this is applyOps because - // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is safe - // to batch applyOps commands with CRUD operations when reading from the oplog buffer. + // Commands must be processed one at a time. The exceptions to this are unprepared applyOps, + // because applyOps oplog entries are effectively containers for CRUD operations, and unprepared + // commitTransaction, because that also expands to CRUD operations. Therefore, it is safe to + // batch applyOps commands with CRUD operations when reading from the oplog buffer. // // Oplog entries on 'system.views' should also be processed one at a time. View catalog // immediately reflects changes for each oplog entry so we can see inconsistent view catalog if @@ -880,8 +898,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, // // Process updates to 'admin.system.version' individually as well so the secondary's FCV when // processing each operation matches the primary's when committing that operation. - if ((entry.isCommand() && - (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare())) || + if ((entry.isCommand() && (!isUnpreparedCommit(entry) && !isUnpreparedApplyOps(entry))) || entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) { if (ops->getCount() == 1) { // apply commands one-at-a-time @@ -1170,7 +1187,8 @@ Status multiSyncApply(OperationContext* opCtx, * vector in any other way. * writerVectors - Set of operations for each worker thread to apply. * derivedOps - If provided, this function inserts a decomposition of applyOps operations - * and instructions for updating the transactions table. + * and instructions for updating the transactions table. Required if processing oplogs + * with transactions. * sessionUpdateTracker - if provided, keeps track of session info from ops. */ void SyncTail::_fillWriterVectors(OperationContext* opCtx, @@ -1185,6 +1203,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, const uint32_t numWriters = writerVectors->size(); CachedCollectionProperties collPropertiesCache; + LogicalSessionIdMap<std::vector<OplogEntry*>> pendingTxnOps; for (auto&& op : *ops) { // If the operation's optime is before or the same as the beginApplyingOpTime we don't want @@ -1208,6 +1227,20 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, } } + // If this entry is part of a multi-oplog-entry transaction, ignore it until the commit. + // We must save it here because we are not guaranteed it has been written to the oplog + // yet. + if (op.isInPendingTransaction()) { + auto& pendingList = pendingTxnOps[*op.getSessionId()]; + if (!pendingList.empty() && pendingList.front()->getTxnNumber() != op.getTxnNumber()) { + // TODO: When abortTransaction is implemented, this should invariant and + // the list should be cleared on abort. + pendingList.clear(); + } + pendingList.push_back(&op); + continue; + } + if (op.isCrudOpType()) { auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs); @@ -1233,7 +1266,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // Extract applyOps operations and fill writers with extracted operations using this // function. - if (op.getCommandType() == OplogEntry::CommandType::kApplyOps && !op.shouldPrepare()) { + if (isUnpreparedApplyOps(op)) { try { derivedOps->emplace_back(ApplyOps::extractOperations(op)); @@ -1247,6 +1280,40 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, << redact(op.toBSON()))); } continue; + } else if (isUnpreparedCommit(op)) { + // On commit of unprepared transactions, get transactional operations from the oplog and + // fill writers with those operations. + try { + invariant(derivedOps); + auto& pendingList = pendingTxnOps[*op.getSessionId()]; + { + // We need to create an alternate opCtx to avoid the reads of the transaction + // messing up the state of the main opCtx. In particular we do not want to + // set the ReadSource to kLastApplied for the main opCtx. + // TODO(SERVER-40053): This should be no longer necessary after + // SERVER-40053 makes the transaction history iterator + // avoid changing the read source. + auto newClient = + opCtx->getServiceContext()->makeClient("read-pending-transactions"); + AlternativeClientRegion acr(newClient); + auto newOpCtx = cc().makeOperationContext(); + ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( + newOpCtx->lockState()); + derivedOps->emplace_back( + readTransactionOperationsFromOplogChain(newOpCtx.get(), op, pendingList)); + pendingList.clear(); + } + // Transaction entries cannot have different session updates. + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + } catch (...) { + fassertFailedWithStatusNoTrace( + 51116, + exceptionToStatus().withContext(str::stream() + << "Unable to read operations for transaction " + << "commit " + << redact(op.toBSON()))); + } + continue; } auto& writer = (*writerVectors)[hash % numWriters]; diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index f080f656c4d..5cd3d140d34 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -49,6 +49,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/bgsync.h" @@ -65,6 +66,8 @@ #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/stats/counters.h" +#include "mongo/db/transaction_participant_gen.h" #include "mongo/stdx/mutex.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" @@ -470,6 +473,389 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } +class MultiOplogEntrySyncTailTest : public SyncTailTest { + void setUp() override { + SyncTailTest::setUp(); + gUseMultipleOplogEntryFormatForTransactions = true; + } + void tearDown() override { + gUseMultipleOplogEntryFormatForTransactions = false; + SyncTailTest::tearDown(); + } +}; + +TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { + NamespaceString nss1("test.pendingtxn1"); + NamespaceString nss2("test.pendingtxn2"); + + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(1); + + auto insertOp1 = + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 1), 1LL}, + nss1, + uuid1, + BSON("_id" << 1), + lsid, + txnNum, + StmtId(0), + OpTime()); + insertOp1 = uassertStatusOK( + OplogEntry::parse(insertOp1.toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto insertOp2 = + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 2), 1LL}, + nss2, + uuid2, + BSON("_id" << 2), + lsid, + txnNum, + StmtId(1), + insertOp1.getOpTime()); + insertOp2 = uassertStatusOK( + OplogEntry::parse(insertOp2.toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum, + StmtId(2), + insertOp2.getOpTime()); + // This re-parse puts the commit op into a normalized form for comparison. + commitOp = uassertStatusOK(OplogEntry::parse(commitOp.toBSON())); + + // Use separate vectors for each namespace as the opObserver may be called from multiple + // threads at once. + std::vector<BSONObj> insertedOplogEntries, insertedDocs1, insertedDocs2; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + if (nss.isOplog()) + insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end()); + else if (nss == nss1) { + insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end()); + } else if (nss == nss2) { + insertedDocs2.insert(insertedDocs2.end(), docs.begin(), docs.end()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) { + // Not testing session updates for now. + } else + FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); + }; + + auto writerPool = OplogApplier::makeWriterPool(); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); + + // Apply a batch with only the first operation. This should result in the first oplog entry + // being put in the oplog, but with no effect because the operation is part of a pending + // transaction. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp1})); + ASSERT_EQ(1U, insertedOplogEntries.size()); + ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp1.toBSON()); + ASSERT_TRUE(insertedDocs1.empty()); + ASSERT_TRUE(insertedDocs2.empty()); + + // Apply a batch with only the second operation. This should result in the second oplog entry + // being put in the oplog, but with no effect because the operation is part of a pending + // transaction. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp2})); + ASSERT_EQ(2U, insertedOplogEntries.size()); + ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp2.toBSON()); + ASSERT_TRUE(insertedDocs1.empty()); + ASSERT_TRUE(insertedDocs2.empty()); + + // Apply a batch with only the commit. This should result in the commit being put in the + // oplog, and the two previous entries being applied. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {commitOp})); + ASSERT_EQ(3U, insertedOplogEntries.size()); + ASSERT_EQ(1U, insertedDocs1.size()); + ASSERT_EQ(1U, insertedDocs2.size()); + ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), commitOp.toBSON()); +} + +TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) { + NamespaceString nss1("test.pendingtxn1"); + NamespaceString nss2("test.pendingtxn2"); + + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(1); + + auto insertOp1 = + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 1), 1LL}, + nss1, + uuid1, + BSON("_id" << 1), + lsid, + txnNum, + StmtId(0), + OpTime()); + insertOp1 = uassertStatusOK( + OplogEntry::parse(insertOp1.toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto insertOp2 = + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 2), 1LL}, + nss2, + uuid2, + BSON("_id" << 2), + lsid, + txnNum, + StmtId(1), + insertOp1.getOpTime()); + insertOp2 = uassertStatusOK( + OplogEntry::parse(insertOp2.toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum, + StmtId(2), + insertOp2.getOpTime()); + // This re-parse puts the commit op into a normalized form. + commitOp = uassertStatusOK(OplogEntry::parse(commitOp.toBSON())); + + // Use separate vectors for each namespace as the opObserver may be called from multiple + // threads at once. + std::vector<BSONObj> insertedOplogEntries, insertedDocs1, insertedDocs2; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + if (nss.isOplog()) + insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end()); + else if (nss == nss1) { + insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end()); + } else if (nss == nss2) { + insertedDocs2.insert(insertedDocs2.end(), docs.begin(), docs.end()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) { + // Not testing session updates for now. + } else + FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); + }; + + auto writerPool = OplogApplier::makeWriterPool(); + // Skipping writes to oplog proves we're testing the code path which does not rely on reading + // the oplog. + OplogApplier::Options applierOpts; + applierOpts.skipWritesToOplog = true; + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + applierOpts); + + // Apply both inserts and the commit in a single batch. We expect no oplog entries to + // be inserted (because we've set skipWritesToOplog), and both entries to be committed. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp1, insertOp2, commitOp})); + ASSERT_EQ(0U, insertedOplogEntries.size()); + ASSERT_EQ(1U, insertedDocs1.size()); + ASSERT_EQ(1U, insertedDocs2.size()); +} + +TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { + // Tests an unprepared transaction with ops both in the batch with the commit and prior + // batches. + NamespaceString nss1("test.pendingtxn1"); + NamespaceString nss2("test.pendingtxn2"); + + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(1); + + // Populate transaction with 4 linked inserts, one in nss2 and the others in nss1. + std::vector<OplogEntry> insertOps; + for (int i = 0; i < 4; i++) { + insertOps.push_back(makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), i + 1), 1LL}, + i == 1 ? nss2 : nss1, + i == 1 ? uuid2 : uuid1, + BSON("_id" << i), + lsid, + txnNum, + StmtId(i), + i == 0 ? OpTime() : insertOps.back().getOpTime())); + insertOps.back() = uassertStatusOK(OplogEntry::parse( + insertOps.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + } + auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 5), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum, + StmtId(4), + insertOps.back().getOpTime()); + // This re-parse puts the commit op into a normalized form. + commitOp = uassertStatusOK(OplogEntry::parse(commitOp.toBSON())); + + // Use separate vectors for each namespace as the opObserver may be called from multiple + // threads at once. + std::vector<BSONObj> insertedOplogEntries, insertedDocs1, insertedDocs2; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + if (nss.isOplog()) + insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end()); + else if (nss == nss1) { + insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end()); + } else if (nss == nss2) { + insertedDocs2.insert(insertedDocs2.end(), docs.begin(), docs.end()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) { + // Not testing session updates for now. + } else + FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); + }; + + auto writerPool = OplogApplier::makeWriterPool(); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); + + // Insert the first entry in its own batch. This should result in the oplog entry being written + // but the entry should not be applied as it is part of a pending transaction. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]})); + ASSERT_EQ(1U, insertedOplogEntries.size()); + ASSERT_EQ(0U, insertedDocs1.size()); + ASSERT_EQ(0U, insertedDocs2.size()); + + // Insert the rest of the entries, including the commit. These entries should be added to the + // oplog, and all the entries including the first should be applied. + ASSERT_OK( + syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp})); + ASSERT_EQ(5U, insertedOplogEntries.size()); + ASSERT_EQ(3U, insertedDocs1.size()); + ASSERT_EQ(1U, insertedDocs2.size()); + + // Check docs and ordering of docs in nss1. + // The insert into nss2 is unordered with respect to those. + ASSERT_BSONOBJ_EQ(insertOps[0].getObject(), insertedDocs1[0]); + ASSERT_BSONOBJ_EQ(insertOps[1].getObject(), insertedDocs2.front()); + ASSERT_BSONOBJ_EQ(insertOps[2].getObject(), insertedDocs1[1]); + ASSERT_BSONOBJ_EQ(insertOps[3].getObject(), insertedDocs1[2]); +} + +TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) { + // Tests that two transactions on the same session ID in the same batch both + // apply correctly. + NamespaceString nss1("test.pendingtxn1"); + + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); + + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum1(1); + TxnNumber txnNum2(2); + + std::vector<OplogEntry> insertOps1, insertOps2; + insertOps1.push_back( + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 1), 1LL}, + nss1, + uuid1, + BSON("_id" << 1), + lsid, + txnNum1, + StmtId(0), + OpTime())); + insertOps1.back() = uassertStatusOK(OplogEntry::parse( + insertOps1.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + insertOps1.push_back( + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 2), 1LL}, + nss1, + uuid1, + BSON("_id" << 2), + lsid, + txnNum1, + StmtId(1), + insertOps1.back().getOpTime())); + insertOps1.back() = uassertStatusOK(OplogEntry::parse( + insertOps1.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + insertOps2.push_back( + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 1), 1LL}, + nss1, + uuid1, + BSON("_id" << 3), + lsid, + txnNum2, + StmtId(0), + OpTime())); + insertOps2.back() = uassertStatusOK(OplogEntry::parse( + insertOps2.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + insertOps2.push_back( + makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 2), 1LL}, + nss1, + uuid1, + BSON("_id" << 4), + lsid, + txnNum2, + StmtId(1), + insertOps2.back().getOpTime())); + insertOps2.back() = uassertStatusOK(OplogEntry::parse( + insertOps2.back().toBSON().addField(BSON("inTxn" << true).firstElement()))); + auto commitOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum1, + StmtId(2), + insertOps1.back().getOpTime()); + auto commitOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(2), 3), 1LL}, + nss1, + BSON("commitTransaction" << 1 << "prepared" << false), + lsid, + txnNum2, + StmtId(2), + insertOps2.back().getOpTime()); + // This re-parse puts the commit ops into a normalized form. + commitOp1 = uassertStatusOK(OplogEntry::parse(commitOp1.toBSON())); + commitOp2 = uassertStatusOK(OplogEntry::parse(commitOp2.toBSON())); + + // Use separate vectors for each namespace as the opObserver may be called from multiple + // threads at once. + std::vector<BSONObj> insertedOplogEntries, insertedDocs1; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + if (nss.isOplog()) + insertedOplogEntries.insert(insertedOplogEntries.end(), docs.begin(), docs.end()); + else if (nss == nss1) { + insertedDocs1.insert(insertedDocs1.end(), docs.begin(), docs.end()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) { + // Not testing session updates for now. + } else + FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); + }; + + auto writerPool = OplogApplier::makeWriterPool(); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); + + // Note the insert counter so we can check it later. It is necessary to use opCounters as + // inserts are idempotent so we will not detect duplicate inserts just by checking inserts in + // the opObserver. + int insertsBefore = replOpCounters.getInsert()->load(); + // Insert all the oplog entries in one batch. All inserts should be executed, in order, exactly + // once. + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), + {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2})); + ASSERT_EQ(6U, insertedOplogEntries.size()); + ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore); + ASSERT_EQ(4U, insertedDocs1.size()); + + // Check docs and ordering of docs in nss1. + ASSERT_BSONOBJ_EQ(insertOps1[0].getObject(), insertedDocs1[0]); + ASSERT_BSONOBJ_EQ(insertOps1[1].getObject(), insertedDocs1[1]); + ASSERT_BSONOBJ_EQ(insertOps2[0].getObject(), insertedDocs1[2]); + ASSERT_BSONOBJ_EQ(insertOps2[1].getObject(), insertedDocs1[3]); +} + void testWorkerMultikeyPaths(OperationContext* opCtx, const OplogEntry& op, unsigned long numPaths) { @@ -1804,25 +2190,34 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) { ASSERT(client.runCommand(ns1.db().toString(), BSON("create" << ns1.coll()), result)); ASSERT(client.runCommand(ns2.db().toString(), BSON("create" << ns2.coll()), result)); ASSERT(client.runCommand(ns3.db().toString(), BSON("create" << ns3.coll()), result)); + auto uuid0 = [&] { + return AutoGetCollectionForRead(_opCtx.get(), ns0).getCollection()->uuid(); + }(); + auto uuid1 = [&] { + return AutoGetCollectionForRead(_opCtx.get(), ns1).getCollection()->uuid(); + }(); + auto uuid2 = [&] { + return AutoGetCollectionForRead(_opCtx.get(), ns2).getCollection()->uuid(); + }(); // Entries with a session id and a txnNumber update the transaction table. auto lsidSingle = makeLogicalSessionIdForTest(); auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 0), 1LL}, ns0, BSON("_id" << 0), lsidSingle, 5LL, 0); + {Timestamp(Seconds(1), 0), 1LL}, ns0, uuid0, BSON("_id" << 0), lsidSingle, 5LL, 0); // For entries with the same session, the entry with a larger txnNumber is saved. auto lsidDiffTxn = makeLogicalSessionIdForTest(); auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(2), 0), 1LL}, ns1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1); + {Timestamp(Seconds(2), 0), 1LL}, ns1, uuid1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1); auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(3), 0), 1LL}, ns1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1); + {Timestamp(Seconds(3), 0), 1LL}, ns1, uuid1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1); // For entries with the same session and txnNumber, the later optime is saved. auto lsidSameTxn = makeLogicalSessionIdForTest(); auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(6), 0), 1LL}, ns2, BSON("_id" << 0), lsidSameTxn, 30LL, 0); + {Timestamp(Seconds(6), 0), 1LL}, ns2, uuid2, BSON("_id" << 0), lsidSameTxn, 30LL, 0); auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(5), 0), 1LL}, ns2, BSON("_id" << 1), lsidSameTxn, 30LL, 1); + {Timestamp(Seconds(5), 0), 1LL}, ns2, uuid2, BSON("_id" << 1), lsidSameTxn, 30LL, 1); // Entries with a session id but no txnNumber do not lead to updates. auto lsidNoTxn = makeLogicalSessionIdForTest(); diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index d4921e5864f..d5076312631 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -79,6 +79,9 @@ Status applyCommitTransaction(OperationContext* opCtx, IDLParserErrorContext ctx("commitTransaction"); auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject()); + const bool prepared = !commitCommand.getPrepared() || *commitCommand.getPrepared(); + if (!prepared) + return Status::OK(); if (mode == repl::OplogApplication::Mode::kRecovering || mode == repl::OplogApplication::Mode::kInitialSync) { @@ -139,4 +142,56 @@ Status applyAbortTransaction(OperationContext* opCtx, return Status::OK(); } +repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( + OperationContext* opCtx, + const repl::OplogEntry& commitOrPrepare, + const std::vector<repl::OplogEntry*> cachedOps) { + repl::MultiApplier::Operations ops; + + // Get the previous oplog entry. + auto currentOpTime = commitOrPrepare.getOpTime(); + + // The cachedOps are the ops for this transaction that are from the same oplog application batch + // as the commit or prepare, those which have not necessarily been written to the oplog. These + // ops are in order of increasing timestamp. + + // The lastEntryOpTime is the OpTime of the last (latest OpTime) entry for this transaction + // which is expected to be present in the oplog. It is the entry before the first cachedOp, + // unless there are no cachedOps in which case it is the entry before the commit or prepare. + const auto lastEntryOpTime = (cachedOps.empty() ? commitOrPrepare : *cachedOps.front()) + .getPrevWriteOpTimeInTransaction(); + invariant(lastEntryOpTime < currentOpTime); + + TransactionHistoryIterator iter(lastEntryOpTime.get()); + // Empty commits are not allowed, but empty prepares are. + invariant(commitOrPrepare.getCommandType() != + repl::OplogEntry::CommandType::kCommitTransaction || + !cachedOps.empty() || iter.hasNext()); + auto commitOrPrepareObj = commitOrPrepare.toBSON(); + + // First retrieve and transform the ops from the oplog, which will be retrieved in reverse + // order. + while (iter.hasNext()) { + const auto& operationEntry = iter.next(opCtx); + invariant(operationEntry.isInPendingTransaction()); + // Now reconstruct the entry "as if" it were at the commit or prepare time. + BSONObjBuilder builder(operationEntry.getReplOperation().toBSON()); + builder.appendElementsUnique(commitOrPrepareObj); + ops.emplace_back(builder.obj()); + } + std::reverse(ops.begin(), ops.end()); + + // Next retrieve and transform the ops from the current batch, which are in increasing timestamp + // order. + for (auto* cachedOp : cachedOps) { + const auto& operationEntry = *cachedOp; + invariant(operationEntry.isInPendingTransaction()); + // Now reconstruct the entry "as if" it were at the commit or prepare time. + BSONObjBuilder builder(operationEntry.getReplOperation().toBSON()); + builder.appendElementsUnique(commitOrPrepareObj); + ops.emplace_back(builder.obj()); + } + return ops; +} + } // namespace mongo diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h index ee2c77b722d..27de289fe89 100644 --- a/src/mongo/db/repl/transaction_oplog_application.h +++ b/src/mongo/db/repl/transaction_oplog_application.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/db/operation_context.h" +#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" @@ -49,4 +50,13 @@ Status applyAbortTransaction(OperationContext* opCtx, const repl::OplogEntry& entry, repl::OplogApplication::Mode mode); +/** + * Follow an oplog chain and copy the operations to destination. Operations will be copied in + * forward oplog order (increasing optimes). + */ +repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( + OperationContext* opCtx, + const repl::OplogEntry& entry, + const std::vector<repl::OplogEntry*> cachedOps); + } // namespace mongo |