diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/auth/auth_op_observer.h | 7 | ||||
-rw-r--r-- | src/mongo/db/catalog/uuid_catalog.h | 7 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_op_observer.h | 7 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 11 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.h | 7 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/op_observer_noop.h | 7 | ||||
-rw-r--r-- | src/mongo/db/op_observer_registry.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/do_txn_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.h | 7 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 22 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_retryable_writes_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 76 |
16 files changed, 239 insertions, 86 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index 93fd54cc7de..918dfaf10ed 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -159,9 +159,12 @@ public: void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) final {} + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) final {} - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) final {} + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h index e7db04647fe..8d828e74953 100644 --- a/src/mongo/db/catalog/uuid_catalog.h +++ b/src/mongo/db/catalog/uuid_catalog.h @@ -155,8 +155,11 @@ public: OptionalCollectionUUID uuid) override {} void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) override {} - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {} + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) override {} + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} void onReplicationRollback(OperationContext* opCtx, diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index d11fbf00099..3cd4ba4caa0 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -160,9 +160,12 @@ public: void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) final {} + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) final {} - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) final {} + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 682a6f0311d..b0f187adb4f 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -287,18 +287,25 @@ public: * If the transaction was prepared, then 'commitOplogEntryOpTime' is passed in to be used as the * OpTime of the oplog entry. The 'commitTimestamp' is the timestamp at which the multi-document * transaction was committed. Either these fields should both be 'none' or neither should. + * + * The 'statements' are the list of CRUD operations to be applied in this transaction. */ virtual void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) = 0; + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) = 0; /** * The onTransactionPrepare method is called when an atomic transaction is prepared. It must be * called when a transaction is active. * * The 'prepareOpTime' is passed in to be used as the OpTime of the oplog entry. + * + * The 'statements' are the list of CRUD operations to be applied in this transaction. */ - virtual void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) = 0; + virtual void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) = 0; /** * The onTransactionAbort method is called when an atomic transaction aborts, before the diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 05820684dbc..1d189adc3f9 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1085,16 +1085,14 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, void OpObserverImpl::onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) { invariant(opCtx->getTxnNumber()); if (!opCtx->writesAreReplicated()) { return; } - const auto txnParticipant = TransactionParticipant::get(opCtx); - invariant(txnParticipant); - if (commitOplogEntryOpTime) { invariant(commitTimestamp); invariant(!commitTimestamp->isNull()); @@ -1105,26 +1103,24 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx, opCtx, *commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted); } else { invariant(!commitTimestamp); - const auto stmts = txnParticipant->endTransactionAndRetrieveOperations(opCtx); // It is possible that the transaction resulted in no changes. In that case, we should // not write an empty applyOps entry. - if (stmts.empty()) + if (statements.empty()) return; - const auto commitOpTime = logApplyOpsForTransaction(opCtx, stmts, OplogSlot()).writeOpTime; + const auto commitOpTime = + logApplyOpsForTransaction(opCtx, statements, OplogSlot()).writeOpTime; invariant(!commitOpTime.isNull()); } } -void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) { +void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) { invariant(opCtx->getTxnNumber()); - const auto txnParticipant = TransactionParticipant::get(opCtx); - invariant(txnParticipant); - invariant(txnParticipant->inMultiDocumentTransaction()); invariant(!prepareOpTime.opTime.isNull()); - auto stmts = txnParticipant->endTransactionAndRetrieveOperations(opCtx); // Don't write oplog entry on secondaries. if (!opCtx->writesAreReplicated()) { @@ -1145,7 +1141,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const OplogSl Lock::GlobalLock globalLock(opCtx, MODE_IX); WriteUnitOfWork wuow(opCtx); - logApplyOpsForTransaction(opCtx, stmts, prepareOpTime); + logApplyOpsForTransaction(opCtx, statements, prepareOpTime); wuow.commit(); }); } diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 86ced3d6f81..2d09eaa710b 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -140,8 +140,11 @@ public: OptionalCollectionUUID uuid); void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) final; - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) final; + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) final; + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statments) final; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final; void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final; diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index fff623af6ea..8d05bed6ffa 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -684,7 +684,11 @@ TEST_F(OpObserverLargeTransactionTest, TransactionTooLargeWhileCommitting) { << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral))); txnParticipant->addTransactionOperation(opCtx(), operation); txnParticipant->addTransactionOperation(opCtx(), operation); - ASSERT_THROWS_CODE(opObserver().onTransactionCommit(opCtx(), boost::none, boost::none), + ASSERT_THROWS_CODE(opObserver().onTransactionCommit( + opCtx(), + boost::none, + boost::none, + txnParticipant->retrieveCompletedTransactionOperations(opCtx())), AssertionException, ErrorCodes::TransactionTooLarge); } @@ -730,7 +734,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { { WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); - opObserver().onTransactionPrepare(opCtx(), slot); + opObserver().onTransactionPrepare( + opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx())); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); } @@ -805,7 +810,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { txnParticipant->transitionToPreparedforTest(); const auto prepareSlot = repl::getNextOpTime(opCtx()); prepareTimestamp = prepareSlot.opTime.getTimestamp(); - opObserver().onTransactionPrepare(opCtx(), prepareSlot); + opObserver().onTransactionPrepare( + opCtx(), prepareSlot, txnParticipant->retrieveCompletedTransactionOperations(opCtx())); commitSlot = repl::getNextOpTime(opCtx()); } @@ -813,7 +819,13 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { // Mimic committing the transaction. opCtx()->setWriteUnitOfWork(nullptr); opCtx()->lockState()->unsetMaxLockTimeout(); - opObserver().onTransactionCommit(opCtx(), commitSlot, prepareTimestamp); + + txnParticipant->transitionToCommittingWithPrepareforTest(); + opObserver().onTransactionCommit( + opCtx(), + commitSlot, + prepareTimestamp, + txnParticipant->retrieveCompletedTransactionOperations(opCtx())); repl::OplogInterfaceLocal oplogInterface(opCtx(), NamespaceString::kRsOplogNamespace.ns()); auto oplogIter = oplogInterface.makeIterator(); @@ -869,7 +881,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) { txnParticipant->transitionToPreparedforTest(); const auto prepareSlot = repl::getNextOpTime(opCtx()); - opObserver().onTransactionPrepare(opCtx(), prepareSlot); + opObserver().onTransactionPrepare( + opCtx(), prepareSlot, txnParticipant->retrieveCompletedTransactionOperations(opCtx())); abortSlot = repl::getNextOpTime(opCtx()); } @@ -947,7 +960,8 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) { { WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); - opObserver().onTransactionPrepare(opCtx(), slot); + opObserver().onTransactionPrepare( + opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx())); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); } @@ -972,7 +986,8 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable) WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); prepareOpTime = slot.opTime; - opObserver().onTransactionPrepare(opCtx(), slot); + opObserver().onTransactionPrepare( + opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx())); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); } @@ -1003,7 +1018,8 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction { WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); - opObserver().onTransactionPrepare(opCtx(), slot); + opObserver().onTransactionPrepare( + opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx())); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); txnParticipant->transitionToPreparedforTest(); abortSlot = repl::getNextOpTime(opCtx()); @@ -1039,7 +1055,11 @@ TEST_F(OpObserverTransactionTest, CommittingUnpreparedNonEmptyTransactionWritesT opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false); } - opObserver().onTransactionCommit(opCtx(), boost::none, boost::none); + opObserver().onTransactionCommit( + opCtx(), + boost::none, + boost::none, + txnParticipant->retrieveCompletedTransactionOperations(opCtx())); opCtx()->getWriteUnitOfWork()->commit(); assertTxnRecord(txnNum(), {}, DurableTxnStateEnum::kCommitted); @@ -1050,7 +1070,11 @@ TEST_F(OpObserverTransactionTest, auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); - opObserver().onTransactionCommit(opCtx(), boost::none, boost::none); + opObserver().onTransactionCommit( + opCtx(), + boost::none, + boost::none, + txnParticipant->retrieveCompletedTransactionOperations(opCtx())); txnParticipant->stashTransactionResources(opCtx()); @@ -1069,7 +1093,8 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); prepareOpTime = slot.opTime; - opObserver().onTransactionPrepare(opCtx(), slot); + opObserver().onTransactionPrepare( + opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx())); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); txnParticipant->transitionToPreparedforTest(); } @@ -1081,7 +1106,13 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti // Mimic committing the transaction. opCtx()->setWriteUnitOfWork(nullptr); opCtx()->lockState()->unsetMaxLockTimeout(); - opObserver().onTransactionCommit(opCtx(), commitSlot, prepareOpTime.getTimestamp()); + + txnParticipant->transitionToCommittingWithPrepareforTest(); + opObserver().onTransactionCommit( + opCtx(), + commitSlot, + prepareOpTime.getTimestamp(), + txnParticipant->retrieveCompletedTransactionOperations(opCtx())); assertTxnRecord(txnNum(), commitOpTime, DurableTxnStateEnum::kCommitted); } @@ -1113,7 +1144,11 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) { AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); - opObserver().onTransactionCommit(opCtx(), boost::none, boost::none); + opObserver().onTransactionCommit( + opCtx(), + boost::none, + boost::none, + txnParticipant->retrieveCompletedTransactionOperations(opCtx())); auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); @@ -1190,7 +1225,11 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); - opObserver().onTransactionCommit(opCtx(), boost::none, boost::none); + opObserver().onTransactionCommit( + opCtx(), + boost::none, + boost::none, + txnParticipant->retrieveCompletedTransactionOperations(opCtx())); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); @@ -1243,7 +1282,11 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) { BSON("_id" << 1 << "data" << "y")); opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none); - opObserver().onTransactionCommit(opCtx(), boost::none, boost::none); + opObserver().onTransactionCommit( + opCtx(), + boost::none, + boost::none, + txnParticipant->retrieveCompletedTransactionOperations(opCtx())); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index 3a1ad19e848..f3e6d5e3a60 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -139,8 +139,11 @@ public: OptionalCollectionUUID uuid) override {} void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) override{}; - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override{}; + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) override{}; + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) override{}; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override{}; void onReplicationRollback(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index e3cd4b642d7..5e4a69996ac 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -261,16 +261,19 @@ public: void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) override { + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) override { ReservedTimes times{opCtx}; for (auto& o : _observers) - o->onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp); + o->onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp, statements); } - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override { + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) override { ReservedTimes times{opCtx}; for (auto& observer : _observers) { - observer->onTransactionPrepare(opCtx, prepareOpTime); + observer->onTransactionPrepare(opCtx, prepareOpTime, statements); } } diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp index 2e089afebbd..4c573a9c857 100644 --- a/src/mongo/db/repl/do_txn_test.cpp +++ b/src/mongo/db/repl/do_txn_test.cpp @@ -63,7 +63,8 @@ public: */ void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) override; + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) override; // If present, holds the applyOps oplog entry written out by the ObObserverImpl // onTransactionCommit. @@ -72,7 +73,8 @@ public: void OpObserverMock::onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) { ASSERT(!commitOplogEntryOpTime) << commitOplogEntryOpTime->opTime; ASSERT(!commitTimestamp) << *commitTimestamp; diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index be9b1b54c56..476e724518d 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -160,9 +160,12 @@ public: void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) override {} + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) override {} - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {} + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 6459ba2747a..d0fcf00b79b 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -161,9 +161,12 @@ public: void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) override {} + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) override {} - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {} + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 7a5d0e812e8..8f710669788 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -928,7 +928,8 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, // We need to unlock the session to run the opObserver onTransactionPrepare, which calls back // into the session. lk.unlock(); - opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot); + opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare( + opCtx, prepareOplogSlot, retrieveCompletedTransactionOperations(opCtx)); abortGuard.dismiss(); @@ -992,7 +993,7 @@ void TransactionParticipant::addTransactionOperation(OperationContext* opCtx, _transactionOperationBytes <= BSONObjMaxInternalSize); } -std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrieveOperations( +std::vector<repl::ReplOperation>& TransactionParticipant::retrieveCompletedTransactionOperations( OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -1000,13 +1001,32 @@ std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrie // and migration, which do not check out the session. _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); - // Ensure that we only ever end a transaction when prepared or in progress. - invariant(_txnState.isInSet(lk, TransactionState::kPrepared | TransactionState::kInProgress), + // Ensure that we only ever retrieve a transaction's completed operations when in progress, + // committing with prepare, or prepared. + invariant(_txnState.isInSet(lk, + TransactionState::kInProgress | + TransactionState::kCommittingWithPrepare | + TransactionState::kPrepared), + str::stream() << "Current state: " << _txnState); + + return _transactionOperations; +} + +void TransactionParticipant::clearOperationsInMemory(OperationContext* opCtx) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // Always check session's txnNumber and '_txnState', since they can be modified by session kill + // and migration, which do not check out the session. + _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); + + // Ensure that we only ever end a transaction when committing with prepare or in progress. + invariant(_txnState.isInSet( + lk, TransactionState::kCommittingWithPrepare | TransactionState::kInProgress), str::stream() << "Current state: " << _txnState); invariant(_autoCommit); _transactionOperationBytes = 0; - return std::move(_transactionOperations); + _transactionOperations.clear(); } void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx) { @@ -1028,7 +1048,11 @@ void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx lk.unlock(); auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); - opObserver->onTransactionCommit(opCtx, boost::none, boost::none); + opObserver->onTransactionCommit( + opCtx, boost::none, boost::none, retrieveCompletedTransactionOperations(opCtx)); + + clearOperationsInMemory(opCtx); + lk.lock(); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); @@ -1108,9 +1132,14 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx, { // Once the transaction is committed, the oplog entry must be written. UninterruptibleLockGuard lockGuard(opCtx->lockState()); - opObserver->onTransactionCommit(opCtx, commitOplogSlot, commitTimestamp); + opObserver->onTransactionCommit(opCtx, + commitOplogSlot, + commitTimestamp, + retrieveCompletedTransactionOperations(opCtx)); } + clearOperationsInMemory(opCtx); + lk.lock(); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 350bd96e953..5eee0afb03a 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -303,11 +303,19 @@ public: void addTransactionOperation(OperationContext* opCtx, const repl::ReplOperation& operation); /** - * Returns and clears the stored operations for an multi-document (non-autocommit) transaction, - * and marks the transaction as closed. It is illegal to attempt to add operations to the - * transaction after this is called. + * Returns a reference to the stored operations for a completed multi-document (non-autocommit) + * transaction. "Completed" implies that no more operations will be added to the transaction. + * It is legal to call this method only when the transaction state is in progress or committed. */ - std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations(OperationContext* opCtx); + std::vector<repl::ReplOperation>& retrieveCompletedTransactionOperations( + OperationContext* opCtx); + + /** + * Clears the stored operations for an multi-document (non-autocommit) transaction, marking + * the transaction as closed. It is illegal to attempt to add operations to the transaction + * after this is called. + */ + void clearOperationsInMemory(OperationContext* opCtx); /** * Yield or reacquire locks for prepared transacitons, used on replication state transition. @@ -538,6 +546,12 @@ public: _txnState.transitionTo(lk, TransactionState::kPrepared); } + void transitionToCommittingWithPrepareforTest() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _txnState.transitionTo(lk, TransactionState::kCommittingWithPrepare); + } + + void transitionToAbortedWithoutPrepareforTest() { stdx::lock_guard<stdx::mutex> lk(_mutex); _txnState.transitionTo(lk, TransactionState::kAbortedWithoutPrepare); diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index a65de737246..7535712469b 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -87,9 +87,11 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override { + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) override { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime); + OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", @@ -104,9 +106,11 @@ public: void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) override { + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) override { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp); + OpObserverNoop::onTransactionCommit( + opCtx, commitOplogEntryOpTime, commitTimestamp, statements); uassert(ErrorCodes::OperationFailed, "onTransactionCommit() failed", diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 70c55ab773e..4541995166d 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -90,7 +90,9 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override; + void onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) override; bool onTransactionPrepareThrowsException = false; bool transactionPrepared = false; @@ -98,12 +100,15 @@ public: void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) override; + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) override; bool onTransactionCommitThrowsException = false; bool transactionCommitted = false; - stdx::function<void(boost::optional<OplogSlot>, boost::optional<Timestamp>)> + stdx::function<void( + boost::optional<OplogSlot>, boost::optional<Timestamp>, std::vector<repl::ReplOperation>&)> onTransactionCommitFn = [](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) {}; + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) {}; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override; @@ -120,9 +125,11 @@ public: const repl::OpTime dropOpTime = {Timestamp(Seconds(100), 1U), 1LL}; }; -void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) { +void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, + const OplogSlot& prepareOpTime, + std::vector<repl::ReplOperation>& statements) { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime); + OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", @@ -133,7 +140,8 @@ void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const OplogSl void OpObserverMock::onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) { if (commitOplogEntryOpTime) { invariant(commitTimestamp); ASSERT_FALSE(opCtx->lockState()->inAWriteUnitOfWork()); @@ -144,12 +152,12 @@ void OpObserverMock::onTransactionCommit(OperationContext* opCtx, ASSERT(opCtx->lockState()->inAWriteUnitOfWork()); } - OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp); + OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp, statements); uassert(ErrorCodes::OperationFailed, "onTransactionCommit() failed", !onTransactionCommitThrowsException); transactionCommitted = true; - onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp); + onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp, statements); } void OpObserverMock::onTransactionAbort(OperationContext* opCtx, @@ -511,12 +519,15 @@ TEST_F(TxnParticipantTest, CommitTransactionSetsCommitTimestampOnPreparedTransac auto originalFn = _opObserver->onTransactionCommitFn; _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { - originalFn(commitOplogEntryOpTime, commitTimestamp); + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) { + originalFn(commitOplogEntryOpTime, commitTimestamp, statements); ASSERT(commitOplogEntryOpTime); ASSERT(commitTimestamp); ASSERT_GT(*commitTimestamp, prepareTimestamp); + + ASSERT(statements.empty()); }; txnParticipant->commitPreparedTransaction(opCtx(), commitTS); @@ -548,11 +559,13 @@ TEST_F(TxnParticipantTest, CommitTransactionDoesNotSetCommitTimestampOnUnprepare auto originalFn = _opObserver->onTransactionCommitFn; _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { - originalFn(commitOplogEntryOpTime, commitTimestamp); + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) { + originalFn(commitOplogEntryOpTime, commitTimestamp, statements); ASSERT_FALSE(commitOplogEntryOpTime); ASSERT_FALSE(commitTimestamp); ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull()); + ASSERT(statements.empty()); }; auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -696,7 +709,21 @@ TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndAbort) { ErrorCodes::NoSuchTransaction); } -TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) { +TEST_F(TxnParticipantTest, ConcurrencyOfRetrieveCompletedTransactionOperationsAndAbort) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->unstashTransactionResources(opCtx(), "insert"); + + // The transaction may be aborted without checking out the txnParticipant. + txnParticipant->abortArbitraryTransaction(); + + // A retrieveCompletedTransactionOperations() after an abort should uassert. + ASSERT_THROWS_CODE(txnParticipant->retrieveCompletedTransactionOperations(opCtx()), + AssertionException, + ErrorCodes::NoSuchTransaction); +} + +TEST_F(TxnParticipantTest, ConcurrencyOfClearOperationsInMemory) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "insert"); @@ -704,8 +731,8 @@ TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAb // The transaction may be aborted without checking out the txnParticipant. txnParticipant->abortArbitraryTransaction(); - // An endTransactionAndRetrieveOperations() after an abort should uassert. - ASSERT_THROWS_CODE(txnParticipant->endTransactionAndRetrieveOperations(opCtx()), + // An clearOperationsInMemory() after an abort should uassert. + ASSERT_THROWS_CODE(txnParticipant->clearOperationsInMemory(opCtx()), AssertionException, ErrorCodes::NoSuchTransaction); } @@ -867,13 +894,16 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPreparedCommitDoesNotAbortTransacti auto originalFn = _opObserver->onTransactionCommitFn; _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { - originalFn(commitOplogEntryOpTime, commitTimestamp); + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) { + originalFn(commitOplogEntryOpTime, commitTimestamp, statements); ASSERT(commitOplogEntryOpTime); ASSERT(commitTimestamp); ASSERT_GT(*commitTimestamp, prepareTimestamp); + ASSERT(statements.empty()); + // The transaction may be aborted without checking out the txnParticipant. txnParticipant->abortArbitraryTransaction(); ASSERT_FALSE(txnParticipant->transactionIsAborted()); @@ -984,13 +1014,16 @@ TEST_F(TxnParticipantTest, ArbitraryAbortDuringPreparedCommitDoesNotAbortTransac auto originalFn = _opObserver->onTransactionCommitFn; _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { - originalFn(commitOplogEntryOpTime, commitTimestamp); + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) { + originalFn(commitOplogEntryOpTime, commitTimestamp, statements); ASSERT(commitOplogEntryOpTime); ASSERT(commitTimestamp); ASSERT_GT(*commitTimestamp, prepareTimestamp); + ASSERT(statements.empty()); + // The transaction may be aborted without checking out the txnParticipant. auto func = [&](OperationContext* opCtx) { txnParticipant->abortArbitraryTransaction(); }; runFunctionFromDifferentOpCtx(func); @@ -1211,7 +1244,8 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti auto sessionId = *opCtx()->getLogicalSessionId(); auto txnNum = *opCtx()->getTxnNumber(); _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { + boost::optional<Timestamp> commitTimestamp, + std::vector<repl::ReplOperation>& statements) { // This should never happen. auto func = [&](OperationContext* opCtx) { opCtx->setLogicalSessionId(sessionId); |