diff options
author | Blake Oler <blake.oler@mongodb.com> | 2019-02-13 12:44:39 -0500 |
---|---|---|
committer | Blake Oler <blake.oler@mongodb.com> | 2019-02-20 15:41:41 -0500 |
commit | bdd0c8ff4cb70b3c14e2dfbe68c0baa3b26a6e82 (patch) | |
tree | 9add6eba3de4aebaf6b1e9b6b2be1ef53059598e | |
parent | 0d79175a88ee958722c0ffb276606949e695d028 (diff) | |
download | mongo-bdd0c8ff4cb70b3c14e2dfbe68c0baa3b26a6e82.tar.gz |
SERVER-39561 Split OpObserver::onTransactionCommit() into two functions for unprepared and prepared transactions respectively
-rw-r--r-- | src/mongo/db/auth/auth_op_observer.h | 12 | ||||
-rw-r--r-- | src/mongo/db/catalog/uuid_catalog.h | 11 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_op_observer.h | 12 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 29 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.h | 11 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/op_observer_noop.h | 11 | ||||
-rw-r--r-- | src/mongo/db/op_observer_registry.h | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/do_txn_test.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.h | 12 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.h | 12 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_retryable_writes_test.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 149 |
15 files changed, 268 insertions, 200 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index 1f0280647eb..a8afc4baefc 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -156,10 +156,14 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid) final; - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) final {} + void onUnpreparedTransactionCommit(OperationContext* opCtx, + const std::vector<repl::ReplOperation>& statements) final {} + + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept final {} void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime, diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h index efe923cf216..ea86b990eee 100644 --- a/src/mongo/db/catalog/uuid_catalog.h +++ b/src/mongo/db/catalog/uuid_catalog.h @@ -152,10 +152,13 @@ public: void onEmptyCapped(OperationContext* opCtx, const NamespaceString& collectionName, OptionalCollectionUUID uuid) override {} - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) override {} + void onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {} + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept override {} void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime, std::vector<repl::ReplOperation>& statements) override {} diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index bd266a8d578..9aec975e8ec 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -157,10 +157,14 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid) final {} - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) final {} + void onUnpreparedTransactionCommit(OperationContext* opCtx, + const std::vector<repl::ReplOperation>& statements) final {} + + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept final {} void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime, diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 88d7294b1f2..a6664937df6 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -279,20 +279,31 @@ public: virtual void onEmptyCapped(OperationContext* opCtx, const NamespaceString& collectionName, OptionalCollectionUUID uuid) = 0; + + /** + * The onUnpreparedTransactionCommit method is called on the commit of an unprepared + * transaction, before the RecoveryUnit onCommit() is called. It must not be called when no + * transaction is active. + * + * The 'statements' are the list of CRUD operations to be applied in this transaction. + */ + virtual void onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) = 0; /** - * The onTransactionCommit method is called on the commit of an atomic transaction, before the - * RecoveryUnit onCommit() is called. It must not be called when no transaction is active. + * The onPreparedTransactionCommit method is called on the commit of a prepared transaction, + * after the RecoveryUnit onCommit() is called. It must not be called when no transaction is + * active. * - * If the transaction was prepared, then 'commitOplogEntryOpTime' is passed in to be used as the - * OpTime of the oplog entry. The 'commitTimestamp' is the timestamp at which the multi-document - * transaction was committed. Either these fields should both be 'none' or neither should. + * The 'commitOplogEntryOpTime' is passed in to be used as the OpTime of the oplog entry. The + * 'commitTimestamp' is the timestamp at which the multi-document transaction was committed. * * The 'statements' are the list of CRUD operations to be applied in this transaction. */ - virtual void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) = 0; + virtual void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept = 0; /** * The onTransactionPrepare method is called when an atomic transaction is prepared. It must be diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 17db14347b9..6996a5e772c 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1096,36 +1096,40 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, } // namespace -void OpObserverImpl::onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) { +void OpObserverImpl::onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) { invariant(opCtx->getTxnNumber()); if (!opCtx->writesAreReplicated()) { return; } - if (commitOplogEntryOpTime) { - invariant(commitTimestamp); - invariant(!commitTimestamp->isNull()); + // It is possible that the transaction resulted in no changes. In that case, we should + // not write an empty applyOps entry. + if (statements.empty()) + return; - CommitTransactionOplogObject cmdObj; - cmdObj.setCommitTimestamp(*commitTimestamp); - logCommitOrAbortForPreparedTransaction( - opCtx, *commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted); - } else { - invariant(!commitTimestamp); + const auto commitOpTime = logApplyOpsForTransaction(opCtx, statements, OplogSlot()).writeOpTime; + invariant(!commitOpTime.isNull()); +} - // It is possible that the transaction resulted in no changes. In that case, we should - // not write an empty applyOps entry. - if (statements.empty()) - return; +void OpObserverImpl::onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept { + invariant(opCtx->getTxnNumber()); - const auto commitOpTime = - logApplyOpsForTransaction(opCtx, statements, OplogSlot()).writeOpTime; - invariant(!commitOpTime.isNull()); + if (!opCtx->writesAreReplicated()) { + return; } + + invariant(!commitTimestamp.isNull()); + + CommitTransactionOplogObject cmdObj; + cmdObj.setCommitTimestamp(commitTimestamp); + logCommitOrAbortForPreparedTransaction( + opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted); } void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index d9f5d701227..bfec7d9f41e 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -137,10 +137,13 @@ public: void onEmptyCapped(OperationContext* opCtx, const NamespaceString& collectionName, OptionalCollectionUUID uuid); - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) final; + void onUnpreparedTransactionCommit(OperationContext* opCtx, + const std::vector<repl::ReplOperation>& statements) final; + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept final; void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime, std::vector<repl::ReplOperation>& statments) final; diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 48877eee1a0..6c4828710e3 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -683,11 +683,8 @@ TEST_F(OpObserverLargeTransactionTest, TransactionTooLargeWhileCommitting) { << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral))); txnParticipant.addTransactionOperation(opCtx(), operation); txnParticipant.addTransactionOperation(opCtx(), operation); - ASSERT_THROWS_CODE(opObserver().onTransactionCommit( - opCtx(), - boost::none, - boost::none, - txnParticipant.retrieveCompletedTransactionOperations(opCtx())), + ASSERT_THROWS_CODE(opObserver().onUnpreparedTransactionCommit( + opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())), AssertionException, ErrorCodes::TransactionTooLarge); } @@ -820,7 +817,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { opCtx()->lockState()->unsetMaxLockTimeout(); txnParticipant.transitionToCommittingWithPrepareforTest(opCtx()); - opObserver().onTransactionCommit( + opObserver().onPreparedTransactionCommit( opCtx(), commitSlot, prepareTimestamp, @@ -1054,11 +1051,8 @@ TEST_F(OpObserverTransactionTest, CommittingUnpreparedNonEmptyTransactionWritesT opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false); } - opObserver().onTransactionCommit( - opCtx(), - boost::none, - boost::none, - txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); opCtx()->getWriteUnitOfWork()->commit(); assertTxnRecord(txnNum(), {}, DurableTxnStateEnum::kCommitted); @@ -1069,11 +1063,8 @@ TEST_F(OpObserverTransactionTest, auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); - opObserver().onTransactionCommit( - opCtx(), - boost::none, - boost::none, - txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); txnParticipant.stashTransactionResources(opCtx()); @@ -1107,7 +1098,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti opCtx()->lockState()->unsetMaxLockTimeout(); txnParticipant.transitionToCommittingWithPrepareforTest(opCtx()); - opObserver().onTransactionCommit( + opObserver().onPreparedTransactionCommit( opCtx(), commitSlot, prepareOpTime.getTimestamp(), @@ -1143,11 +1134,8 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) { AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); - opObserver().onTransactionCommit( - opCtx(), - boost::none, - boost::none, - txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); @@ -1224,11 +1212,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); - opObserver().onTransactionCommit( - opCtx(), - boost::none, - boost::none, - txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); @@ -1281,11 +1266,8 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) { BSON("_id" << 1 << "data" << "y")); opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none); - opObserver().onTransactionCommit( - opCtx(), - boost::none, - boost::none, - txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index 225bf6995e5..b4290305eca 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -136,10 +136,13 @@ public: void onEmptyCapped(OperationContext* opCtx, const NamespaceString& collectionName, OptionalCollectionUUID uuid) override {} - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) override{}; + void onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override{}; + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept override{}; void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime, std::vector<repl::ReplOperation>& statements) override{}; diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 6ea895855d1..946ee8893e9 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -258,13 +258,22 @@ public: o->onEmptyCapped(opCtx, collectionName, uuid); } - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) override { + void onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override { ReservedTimes times{opCtx}; for (auto& o : _observers) - o->onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp, statements); + o->onUnpreparedTransactionCommit(opCtx, statements); + } + + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept override { + ReservedTimes times{opCtx}; + for (auto& o : _observers) + o->onPreparedTransactionCommit( + opCtx, commitOplogEntryOpTime, commitTimestamp, statements); } void onTransactionPrepare(OperationContext* opCtx, diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp index 8bd9f3c6d39..97174546e31 100644 --- a/src/mongo/db/repl/do_txn_test.cpp +++ b/src/mongo/db/repl/do_txn_test.cpp @@ -52,6 +52,13 @@ namespace mongo { namespace repl { namespace { +boost::optional<OplogEntry> onAllTransactionCommit(OperationContext* opCtx) { + OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns()); + auto oplogIter = oplogInterface.makeIterator(); + auto opEntry = unittest::assertGet(oplogIter->next()); + return unittest::assertGet(OplogEntry::parse(opEntry.first)); +} + /** * Mock OpObserver that tracks doTxn commit events. */ @@ -60,27 +67,36 @@ public: /** * Called by doTxn() when ops are ready to commit. */ - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) override; + void onUnpreparedTransactionCommit(OperationContext* opCtx, + const std::vector<repl::ReplOperation>& statements) override; + + + /** + * Called by doTxn() when ops are ready to commit. + */ + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept override; // If present, holds the applyOps oplog entry written out by the ObObserverImpl - // onTransactionCommit. + // onPreparedTransactionCommit or onUnpreparedTransactionCommit. boost::optional<OplogEntry> applyOpsOplogEntry; }; -void OpObserverMock::onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) { - ASSERT(!commitOplogEntryOpTime) << commitOplogEntryOpTime->opTime; - ASSERT(!commitTimestamp) << *commitTimestamp; +void OpObserverMock::onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) { - OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns()); - auto oplogIter = oplogInterface.makeIterator(); - auto opEntry = unittest::assertGet(oplogIter->next()); - applyOpsOplogEntry = unittest::assertGet(OplogEntry::parse(opEntry.first)); + applyOpsOplogEntry = onAllTransactionCommit(opCtx); +} + +void OpObserverMock::onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept { + applyOpsOplogEntry = onAllTransactionCommit(opCtx); } /** diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 8199e4b32a8..2ab3ea5e89b 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -157,10 +157,14 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid) override {} - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) override {} + void onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {} + + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept override {} void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime, diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 6667715622c..8703db1dfce 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -158,10 +158,14 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid) override {} - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) override {} + void onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {} + + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept override {} void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime, diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 941fa6c5b06..5a7a781e3a2 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1049,8 +1049,7 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); - opObserver->onTransactionCommit( - opCtx, boost::none, boost::none, retrieveCompletedTransactionOperations(opCtx)); + opObserver->onUnpreparedTransactionCommit(opCtx, retrieveCompletedTransactionOperations(opCtx)); clearOperationsInMemory(opCtx); { stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -1129,7 +1128,7 @@ void TransactionParticipant::Participant::commitPreparedTransaction( invariant(opObserver); // Once the transaction is committed, the oplog entry must be written. - opObserver->onTransactionCommit( + opObserver->onPreparedTransactionCommit( opCtx, commitOplogSlot, commitTimestamp, retrieveCompletedTransactionOperations(opCtx)); clearOperationsInMemory(opCtx); diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 5271df46282..30d8274aed7 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -103,27 +103,48 @@ public: bool transactionPrepared = false; stdx::function<void()> onTransactionPrepareFn = [this]() { transactionPrepared = true; }; - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) override { + void onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionCommit( + OpObserverNoop::onUnpreparedTransactionCommit(opCtx, statements); + + uassert(ErrorCodes::OperationFailed, + "onUnpreparedTransactionCommit() failed", + !onUnpreparedTransactionCommitThrowsException); + + onUnpreparedTransactionCommitFn(); + } + + bool onUnpreparedTransactionCommitThrowsException = false; + bool unpreparedTransactionCommitted = false; + + stdx::function<void()> onUnpreparedTransactionCommitFn = [this]() { + unpreparedTransactionCommitted = true; + }; + + + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept override { + ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); + OpObserverNoop::onPreparedTransactionCommit( opCtx, commitOplogEntryOpTime, commitTimestamp, statements); uassert(ErrorCodes::OperationFailed, - "onTransactionCommit() failed", - !onTransactionCommitThrowsException); + "onPreparedTransactionCommit() failed", + !onPreparedTransactionCommitThrowsException); - onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp); + onPreparedTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp); } - bool onTransactionCommitThrowsException = false; - bool transactionCommitted = false; - stdx::function<void(boost::optional<OplogSlot>, boost::optional<Timestamp>)> - onTransactionCommitFn = - [this](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { transactionCommitted = true; }; + bool onPreparedTransactionCommitThrowsException = false; + bool preparedTransactionCommitted = false; + stdx::function<void(OplogSlot, Timestamp)> onPreparedTransactionCommitFn = + [this](OplogSlot commitOplogEntryOpTime, Timestamp commitTimestamp) { + preparedTransactionCommitted = true; + }; repl::OpTime onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName, diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index adf777840fa..731666fb6e4 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -98,17 +98,25 @@ public: bool transactionPrepared = false; stdx::function<void()> onTransactionPrepareFn = []() {}; - void onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) override; - bool onTransactionCommitThrowsException = false; - bool transactionCommitted = false; - stdx::function<void( - boost::optional<OplogSlot>, boost::optional<Timestamp>, std::vector<repl::ReplOperation>&)> - onTransactionCommitFn = [](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) {}; + void onUnpreparedTransactionCommit(OperationContext* opCtx, + const std::vector<repl::ReplOperation>& statements) override; + bool onUnpreparedTransactionCommitThrowsException = false; + bool unpreparedTransactionCommitted = false; + stdx::function<void(const std::vector<repl::ReplOperation>&)> onUnpreparedTransactionCommitFn = + [](const std::vector<repl::ReplOperation>& statements) {}; + + + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept override; + bool onPreparedTransactionCommitThrowsException = false; + bool preparedTransactionCommitted = false; + stdx::function<void(OplogSlot, Timestamp, const std::vector<repl::ReplOperation>&)> + onPreparedTransactionCommitFn = [](OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) {}; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override; @@ -138,26 +146,36 @@ void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, onTransactionPrepareFn(); } -void OpObserverMock::onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) { - if (commitOplogEntryOpTime) { - invariant(commitTimestamp); - ASSERT_FALSE(opCtx->lockState()->inAWriteUnitOfWork()); - // The 'commitTimestamp' must be cleared before we write the oplog entry. - ASSERT(opCtx->recoveryUnit()->getCommitTimestamp().isNull()); - } else { - invariant(!commitTimestamp); - ASSERT(opCtx->lockState()->inAWriteUnitOfWork()); - } +void OpObserverMock::onUnpreparedTransactionCommit( + OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) { + ASSERT(opCtx->lockState()->inAWriteUnitOfWork()); + + OpObserverNoop::onUnpreparedTransactionCommit(opCtx, statements); + + uassert(ErrorCodes::OperationFailed, + "onUnpreparedTransactionCommit() failed", + !onUnpreparedTransactionCommitThrowsException); + + unpreparedTransactionCommitted = true; + onUnpreparedTransactionCommitFn(statements); +} - OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp, statements); +void OpObserverMock::onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept { + ASSERT_FALSE(opCtx->lockState()->inAWriteUnitOfWork()); + // The 'commitTimestamp' must be cleared before we write the oplog entry. + ASSERT(opCtx->recoveryUnit()->getCommitTimestamp().isNull()); + + OpObserverNoop::onPreparedTransactionCommit( + opCtx, commitOplogEntryOpTime, commitTimestamp, statements); uassert(ErrorCodes::OperationFailed, - "onTransactionCommit() failed", - !onTransactionCommitThrowsException); - transactionCommitted = true; - onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp, statements); + "onPreparedTransactionCommit() failed", + !onPreparedTransactionCommitThrowsException); + preparedTransactionCommitted = true; + onPreparedTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp, statements); } void OpObserverMock::onTransactionAbort(OperationContext* opCtx, @@ -446,7 +464,7 @@ TEST_F(TxnParticipantTest, AbortClearsStoredStatements) { // This test makes sure the commit machinery works even when no operations are done on the // transaction. -TEST_F(TxnParticipantTest, EmptyTransactionCommit) { +TEST_F(TxnParticipantTest, EmptyUnpreparedTransactionCommit) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); @@ -505,18 +523,17 @@ TEST_F(TxnParticipantTest, CommitTransactionSetsCommitTimestampOnPreparedTransac const auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {}); const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); - auto originalFn = _opObserver->onTransactionCommitFn; - _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) { - originalFn(commitOplogEntryOpTime, commitTimestamp, statements); - ASSERT(commitOplogEntryOpTime); - ASSERT(commitTimestamp); + auto originalFn = _opObserver->onPreparedTransactionCommitFn; + _opObserver->onPreparedTransactionCommitFn = + [&](OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) { + originalFn(commitOplogEntryOpTime, commitTimestamp, statements); - ASSERT_GT(*commitTimestamp, prepareTimestamp); + ASSERT_GT(commitTimestamp, prepareTimestamp); - ASSERT(statements.empty()); - }; + ASSERT(statements.empty()); + }; txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {}); @@ -545,16 +562,13 @@ TEST_F(TxnParticipantTest, CommitTransactionWithCommitTimestampFailsOnUnprepared TEST_F(TxnParticipantTest, CommitTransactionDoesNotSetCommitTimestampOnUnpreparedTransaction) { auto sessionCheckout = checkOutSession(); - auto originalFn = _opObserver->onTransactionCommitFn; - _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) { - originalFn(commitOplogEntryOpTime, commitTimestamp, statements); - ASSERT_FALSE(commitOplogEntryOpTime); - ASSERT_FALSE(commitTimestamp); - ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull()); - ASSERT(statements.empty()); - }; + auto originalFn = _opObserver->onUnpreparedTransactionCommitFn; + _opObserver->onUnpreparedTransactionCommitFn = + [&](const std::vector<repl::ReplOperation>& statements) { + originalFn(statements); + ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull()); + ASSERT(statements.empty()); + }; auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); @@ -738,7 +752,7 @@ TEST_F(TxnParticipantTest, StepDownAfterPrepareDoesNotBlockThenCommit) { runFunctionFromDifferentOpCtx(func); txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {}); - ASSERT(_opObserver->transactionCommitted); + ASSERT(_opObserver->preparedTransactionCommitted); ASSERT(txnParticipant.transactionIsCommitted()); } @@ -782,31 +796,17 @@ TEST_F(TxnParticipantTest, StepDownDuringPreparedCommitFails) { ErrorCodes::NotMaster); } -DEATH_TEST_F(TxnParticipantTest, - ThrowDuringPreparedOnTransactionCommitIsFatal, - "Caught exception during commit") { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); - - _opObserver->onTransactionCommitThrowsException = true; - const auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {}); - const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); - - txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {}); -} - TEST_F(TxnParticipantTest, ThrowDuringUnpreparedCommitLetsTheAbortAtEntryPointToCleanUp) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); - _opObserver->onTransactionCommitThrowsException = true; + _opObserver->onUnpreparedTransactionCommitThrowsException = true; ASSERT_THROWS_CODE(txnParticipant.commitUnpreparedTransaction(opCtx()), AssertionException, ErrorCodes::OperationFailed); - ASSERT_FALSE(_opObserver->transactionCommitted); + ASSERT_FALSE(_opObserver->unpreparedTransactionCommitted); ASSERT_FALSE(txnParticipant.transactionIsAborted()); ASSERT_FALSE(txnParticipant.transactionIsCommitted()); @@ -983,13 +983,14 @@ DEATH_TEST_F(TxnParticipantTest, auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp); - _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp, - std::vector<repl::ReplOperation>& statements) { - // Hit an invariant. This should never happen. - txnParticipant.abortActiveTransaction(opCtx()); - ASSERT_FALSE(txnParticipant.transactionIsAborted()); - }; + _opObserver->onPreparedTransactionCommitFn = + [&](OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) { + // Hit an invariant. This should never happen. + txnParticipant.abortActiveTransaction(opCtx()); + ASSERT_FALSE(txnParticipant.transactionIsAborted()); + }; txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {}); // Check that we removed the prepareTimestamp from the set. |