summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2019-02-13 12:44:39 -0500
committerBlake Oler <blake.oler@mongodb.com>2019-02-20 15:41:41 -0500
commitbdd0c8ff4cb70b3c14e2dfbe68c0baa3b26a6e82 (patch)
tree9add6eba3de4aebaf6b1e9b6b2be1ef53059598e /src/mongo
parent0d79175a88ee958722c0ffb276606949e695d028 (diff)
downloadmongo-bdd0c8ff4cb70b3c14e2dfbe68c0baa3b26a6e82.tar.gz
SERVER-39561 Split OpObserver::onTransactionCommit() into two functions for unprepared and prepared transactions respectively
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/auth/auth_op_observer.h12
-rw-r--r--src/mongo/db/catalog/uuid_catalog.h11
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h12
-rw-r--r--src/mongo/db/op_observer.h29
-rw-r--r--src/mongo/db/op_observer_impl.cpp44
-rw-r--r--src/mongo/db/op_observer_impl.h11
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp46
-rw-r--r--src/mongo/db/op_observer_noop.h11
-rw-r--r--src/mongo/db/op_observer_registry.h19
-rw-r--r--src/mongo/db/repl/do_txn_test.cpp46
-rw-r--r--src/mongo/db/s/config_server_op_observer.h12
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h12
-rw-r--r--src/mongo/db/transaction_participant.cpp5
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp49
-rw-r--r--src/mongo/db/transaction_participant_test.cpp149
15 files changed, 268 insertions, 200 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index 1f0280647eb..a8afc4baefc 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -156,10 +156,14 @@ public:
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) final;
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) final {}
+ void onUnpreparedTransactionCommit(OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& statements) final {}
+
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept final {}
void onTransactionPrepare(OperationContext* opCtx,
const OplogSlot& prepareOpTime,
diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h
index efe923cf216..ea86b990eee 100644
--- a/src/mongo/db/catalog/uuid_catalog.h
+++ b/src/mongo/db/catalog/uuid_catalog.h
@@ -152,10 +152,13 @@ public:
void onEmptyCapped(OperationContext* opCtx,
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) override {}
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) override {}
+ void onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {}
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept override {}
void onTransactionPrepare(OperationContext* opCtx,
const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) override {}
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index bd266a8d578..9aec975e8ec 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -157,10 +157,14 @@ public:
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) final {}
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) final {}
+ void onUnpreparedTransactionCommit(OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& statements) final {}
+
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept final {}
void onTransactionPrepare(OperationContext* opCtx,
const OplogSlot& prepareOpTime,
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 88d7294b1f2..a6664937df6 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -279,20 +279,31 @@ public:
virtual void onEmptyCapped(OperationContext* opCtx,
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) = 0;
+
+ /**
+ * The onUnpreparedTransactionCommit method is called on the commit of an unprepared
+ * transaction, before the RecoveryUnit onCommit() is called. It must not be called when no
+ * transaction is active.
+ *
+ * The 'statements' are the list of CRUD operations to be applied in this transaction.
+ */
+ virtual void onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) = 0;
/**
- * The onTransactionCommit method is called on the commit of an atomic transaction, before the
- * RecoveryUnit onCommit() is called. It must not be called when no transaction is active.
+ * The onPreparedTransactionCommit method is called on the commit of a prepared transaction,
+ * after the RecoveryUnit onCommit() is called. It must not be called when no transaction is
+ * active.
*
- * If the transaction was prepared, then 'commitOplogEntryOpTime' is passed in to be used as the
- * OpTime of the oplog entry. The 'commitTimestamp' is the timestamp at which the multi-document
- * transaction was committed. Either these fields should both be 'none' or neither should.
+ * The 'commitOplogEntryOpTime' is passed in to be used as the OpTime of the oplog entry. The
+ * 'commitTimestamp' is the timestamp at which the multi-document transaction was committed.
*
* The 'statements' are the list of CRUD operations to be applied in this transaction.
*/
- virtual void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) = 0;
+ virtual void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept = 0;
/**
* The onTransactionPrepare method is called when an atomic transaction is prepared. It must be
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 17db14347b9..6996a5e772c 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1096,36 +1096,40 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
} // namespace
-void OpObserverImpl::onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) {
+void OpObserverImpl::onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) {
invariant(opCtx->getTxnNumber());
if (!opCtx->writesAreReplicated()) {
return;
}
- if (commitOplogEntryOpTime) {
- invariant(commitTimestamp);
- invariant(!commitTimestamp->isNull());
+ // It is possible that the transaction resulted in no changes. In that case, we should
+ // not write an empty applyOps entry.
+ if (statements.empty())
+ return;
- CommitTransactionOplogObject cmdObj;
- cmdObj.setCommitTimestamp(*commitTimestamp);
- logCommitOrAbortForPreparedTransaction(
- opCtx, *commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted);
- } else {
- invariant(!commitTimestamp);
+ const auto commitOpTime = logApplyOpsForTransaction(opCtx, statements, OplogSlot()).writeOpTime;
+ invariant(!commitOpTime.isNull());
+}
- // It is possible that the transaction resulted in no changes. In that case, we should
- // not write an empty applyOps entry.
- if (statements.empty())
- return;
+void OpObserverImpl::onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept {
+ invariant(opCtx->getTxnNumber());
- const auto commitOpTime =
- logApplyOpsForTransaction(opCtx, statements, OplogSlot()).writeOpTime;
- invariant(!commitOpTime.isNull());
+ if (!opCtx->writesAreReplicated()) {
+ return;
}
+
+ invariant(!commitTimestamp.isNull());
+
+ CommitTransactionOplogObject cmdObj;
+ cmdObj.setCommitTimestamp(commitTimestamp);
+ logCommitOrAbortForPreparedTransaction(
+ opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted);
}
void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index d9f5d701227..bfec7d9f41e 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -137,10 +137,13 @@ public:
void onEmptyCapped(OperationContext* opCtx,
const NamespaceString& collectionName,
OptionalCollectionUUID uuid);
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) final;
+ void onUnpreparedTransactionCommit(OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& statements) final;
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept final;
void onTransactionPrepare(OperationContext* opCtx,
const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statments) final;
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 48877eee1a0..6c4828710e3 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -683,11 +683,8 @@ TEST_F(OpObserverLargeTransactionTest, TransactionTooLargeWhileCommitting) {
<< BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral)));
txnParticipant.addTransactionOperation(opCtx(), operation);
txnParticipant.addTransactionOperation(opCtx(), operation);
- ASSERT_THROWS_CODE(opObserver().onTransactionCommit(
- opCtx(),
- boost::none,
- boost::none,
- txnParticipant.retrieveCompletedTransactionOperations(opCtx())),
+ ASSERT_THROWS_CODE(opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())),
AssertionException,
ErrorCodes::TransactionTooLarge);
}
@@ -820,7 +817,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
opCtx()->lockState()->unsetMaxLockTimeout();
txnParticipant.transitionToCommittingWithPrepareforTest(opCtx());
- opObserver().onTransactionCommit(
+ opObserver().onPreparedTransactionCommit(
opCtx(),
commitSlot,
prepareTimestamp,
@@ -1054,11 +1051,8 @@ TEST_F(OpObserverTransactionTest, CommittingUnpreparedNonEmptyTransactionWritesT
opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false);
}
- opObserver().onTransactionCommit(
- opCtx(),
- boost::none,
- boost::none,
- txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
opCtx()->getWriteUnitOfWork()->commit();
assertTxnRecord(txnNum(), {}, DurableTxnStateEnum::kCommitted);
@@ -1069,11 +1063,8 @@ TEST_F(OpObserverTransactionTest,
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
- opObserver().onTransactionCommit(
- opCtx(),
- boost::none,
- boost::none,
- txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
txnParticipant.stashTransactionResources(opCtx());
@@ -1107,7 +1098,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
opCtx()->lockState()->unsetMaxLockTimeout();
txnParticipant.transitionToCommittingWithPrepareforTest(opCtx());
- opObserver().onTransactionCommit(
+ opObserver().onPreparedTransactionCommit(
opCtx(),
commitSlot,
prepareOpTime.getTimestamp(),
@@ -1143,11 +1134,8 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) {
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
- opObserver().onTransactionCommit(
- opCtx(),
- boost::none,
- boost::none,
- txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
auto oplogEntryObj = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntryObj);
OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
@@ -1224,11 +1212,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
- opObserver().onTransactionCommit(
- opCtx(),
- boost::none,
- boost::none,
- txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
auto oplogEntry = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntry);
auto o = oplogEntry.getObjectField("o");
@@ -1281,11 +1266,8 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) {
BSON("_id" << 1 << "data"
<< "y"));
opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none);
- opObserver().onTransactionCommit(
- opCtx(),
- boost::none,
- boost::none,
- txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
auto oplogEntry = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntry);
auto o = oplogEntry.getObjectField("o");
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index 225bf6995e5..b4290305eca 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -136,10 +136,13 @@ public:
void onEmptyCapped(OperationContext* opCtx,
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) override {}
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) override{};
+ void onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override{};
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept override{};
void onTransactionPrepare(OperationContext* opCtx,
const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) override{};
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 6ea895855d1..946ee8893e9 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -258,13 +258,22 @@ public:
o->onEmptyCapped(opCtx, collectionName, uuid);
}
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) override {
+ void onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {
ReservedTimes times{opCtx};
for (auto& o : _observers)
- o->onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp, statements);
+ o->onUnpreparedTransactionCommit(opCtx, statements);
+ }
+
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept override {
+ ReservedTimes times{opCtx};
+ for (auto& o : _observers)
+ o->onPreparedTransactionCommit(
+ opCtx, commitOplogEntryOpTime, commitTimestamp, statements);
}
void onTransactionPrepare(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp
index 8bd9f3c6d39..97174546e31 100644
--- a/src/mongo/db/repl/do_txn_test.cpp
+++ b/src/mongo/db/repl/do_txn_test.cpp
@@ -52,6 +52,13 @@ namespace mongo {
namespace repl {
namespace {
+boost::optional<OplogEntry> onAllTransactionCommit(OperationContext* opCtx) {
+ OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns());
+ auto oplogIter = oplogInterface.makeIterator();
+ auto opEntry = unittest::assertGet(oplogIter->next());
+ return unittest::assertGet(OplogEntry::parse(opEntry.first));
+}
+
/**
* Mock OpObserver that tracks doTxn commit events.
*/
@@ -60,27 +67,36 @@ public:
/**
* Called by doTxn() when ops are ready to commit.
*/
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) override;
+ void onUnpreparedTransactionCommit(OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& statements) override;
+
+
+ /**
+ * Called by doTxn() when ops are ready to commit.
+ */
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept override;
// If present, holds the applyOps oplog entry written out by the ObObserverImpl
- // onTransactionCommit.
+ // onPreparedTransactionCommit or onUnpreparedTransactionCommit.
boost::optional<OplogEntry> applyOpsOplogEntry;
};
-void OpObserverMock::onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) {
- ASSERT(!commitOplogEntryOpTime) << commitOplogEntryOpTime->opTime;
- ASSERT(!commitTimestamp) << *commitTimestamp;
+void OpObserverMock::onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) {
- OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns());
- auto oplogIter = oplogInterface.makeIterator();
- auto opEntry = unittest::assertGet(oplogIter->next());
- applyOpsOplogEntry = unittest::assertGet(OplogEntry::parse(opEntry.first));
+ applyOpsOplogEntry = onAllTransactionCommit(opCtx);
+}
+
+void OpObserverMock::onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept {
+ applyOpsOplogEntry = onAllTransactionCommit(opCtx);
}
/**
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 8199e4b32a8..2ab3ea5e89b 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -157,10 +157,14 @@ public:
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) override {}
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) override {}
+ void onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {}
+
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept override {}
void onTransactionPrepare(OperationContext* opCtx,
const OplogSlot& prepareOpTime,
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 6667715622c..8703db1dfce 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -158,10 +158,14 @@ public:
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) override {}
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) override {}
+ void onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {}
+
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept override {}
void onTransactionPrepare(OperationContext* opCtx,
const OplogSlot& prepareOpTime,
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 941fa6c5b06..5a7a781e3a2 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -1049,8 +1049,7 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
- opObserver->onTransactionCommit(
- opCtx, boost::none, boost::none, retrieveCompletedTransactionOperations(opCtx));
+ opObserver->onUnpreparedTransactionCommit(opCtx, retrieveCompletedTransactionOperations(opCtx));
clearOperationsInMemory(opCtx);
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -1129,7 +1128,7 @@ void TransactionParticipant::Participant::commitPreparedTransaction(
invariant(opObserver);
// Once the transaction is committed, the oplog entry must be written.
- opObserver->onTransactionCommit(
+ opObserver->onPreparedTransactionCommit(
opCtx, commitOplogSlot, commitTimestamp, retrieveCompletedTransactionOperations(opCtx));
clearOperationsInMemory(opCtx);
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index 5271df46282..30d8274aed7 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -103,27 +103,48 @@ public:
bool transactionPrepared = false;
stdx::function<void()> onTransactionPrepareFn = [this]() { transactionPrepared = true; };
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) override {
+ void onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionCommit(
+ OpObserverNoop::onUnpreparedTransactionCommit(opCtx, statements);
+
+ uassert(ErrorCodes::OperationFailed,
+ "onUnpreparedTransactionCommit() failed",
+ !onUnpreparedTransactionCommitThrowsException);
+
+ onUnpreparedTransactionCommitFn();
+ }
+
+ bool onUnpreparedTransactionCommitThrowsException = false;
+ bool unpreparedTransactionCommitted = false;
+
+ stdx::function<void()> onUnpreparedTransactionCommitFn = [this]() {
+ unpreparedTransactionCommitted = true;
+ };
+
+
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept override {
+ ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
+ OpObserverNoop::onPreparedTransactionCommit(
opCtx, commitOplogEntryOpTime, commitTimestamp, statements);
uassert(ErrorCodes::OperationFailed,
- "onTransactionCommit() failed",
- !onTransactionCommitThrowsException);
+ "onPreparedTransactionCommit() failed",
+ !onPreparedTransactionCommitThrowsException);
- onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp);
+ onPreparedTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp);
}
- bool onTransactionCommitThrowsException = false;
- bool transactionCommitted = false;
- stdx::function<void(boost::optional<OplogSlot>, boost::optional<Timestamp>)>
- onTransactionCommitFn =
- [this](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) { transactionCommitted = true; };
+ bool onPreparedTransactionCommitThrowsException = false;
+ bool preparedTransactionCommitted = false;
+ stdx::function<void(OplogSlot, Timestamp)> onPreparedTransactionCommitFn =
+ [this](OplogSlot commitOplogEntryOpTime, Timestamp commitTimestamp) {
+ preparedTransactionCommitted = true;
+ };
repl::OpTime onDropCollection(OperationContext* opCtx,
const NamespaceString& collectionName,
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index adf777840fa..731666fb6e4 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -98,17 +98,25 @@ public:
bool transactionPrepared = false;
stdx::function<void()> onTransactionPrepareFn = []() {};
- void onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) override;
- bool onTransactionCommitThrowsException = false;
- bool transactionCommitted = false;
- stdx::function<void(
- boost::optional<OplogSlot>, boost::optional<Timestamp>, std::vector<repl::ReplOperation>&)>
- onTransactionCommitFn = [](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) {};
+ void onUnpreparedTransactionCommit(OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& statements) override;
+ bool onUnpreparedTransactionCommitThrowsException = false;
+ bool unpreparedTransactionCommitted = false;
+ stdx::function<void(const std::vector<repl::ReplOperation>&)> onUnpreparedTransactionCommitFn =
+ [](const std::vector<repl::ReplOperation>& statements) {};
+
+
+ void onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept override;
+ bool onPreparedTransactionCommitThrowsException = false;
+ bool preparedTransactionCommitted = false;
+ stdx::function<void(OplogSlot, Timestamp, const std::vector<repl::ReplOperation>&)>
+ onPreparedTransactionCommitFn = [](OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) {};
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override;
@@ -138,26 +146,36 @@ void OpObserverMock::onTransactionPrepare(OperationContext* opCtx,
onTransactionPrepareFn();
}
-void OpObserverMock::onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) {
- if (commitOplogEntryOpTime) {
- invariant(commitTimestamp);
- ASSERT_FALSE(opCtx->lockState()->inAWriteUnitOfWork());
- // The 'commitTimestamp' must be cleared before we write the oplog entry.
- ASSERT(opCtx->recoveryUnit()->getCommitTimestamp().isNull());
- } else {
- invariant(!commitTimestamp);
- ASSERT(opCtx->lockState()->inAWriteUnitOfWork());
- }
+void OpObserverMock::onUnpreparedTransactionCommit(
+ OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) {
+ ASSERT(opCtx->lockState()->inAWriteUnitOfWork());
+
+ OpObserverNoop::onUnpreparedTransactionCommit(opCtx, statements);
+
+ uassert(ErrorCodes::OperationFailed,
+ "onUnpreparedTransactionCommit() failed",
+ !onUnpreparedTransactionCommitThrowsException);
+
+ unpreparedTransactionCommitted = true;
+ onUnpreparedTransactionCommitFn(statements);
+}
- OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp, statements);
+void OpObserverMock::onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept {
+ ASSERT_FALSE(opCtx->lockState()->inAWriteUnitOfWork());
+ // The 'commitTimestamp' must be cleared before we write the oplog entry.
+ ASSERT(opCtx->recoveryUnit()->getCommitTimestamp().isNull());
+
+ OpObserverNoop::onPreparedTransactionCommit(
+ opCtx, commitOplogEntryOpTime, commitTimestamp, statements);
uassert(ErrorCodes::OperationFailed,
- "onTransactionCommit() failed",
- !onTransactionCommitThrowsException);
- transactionCommitted = true;
- onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp, statements);
+ "onPreparedTransactionCommit() failed",
+ !onPreparedTransactionCommitThrowsException);
+ preparedTransactionCommitted = true;
+ onPreparedTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp, statements);
}
void OpObserverMock::onTransactionAbort(OperationContext* opCtx,
@@ -446,7 +464,7 @@ TEST_F(TxnParticipantTest, AbortClearsStoredStatements) {
// This test makes sure the commit machinery works even when no operations are done on the
// transaction.
-TEST_F(TxnParticipantTest, EmptyTransactionCommit) {
+TEST_F(TxnParticipantTest, EmptyUnpreparedTransactionCommit) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
@@ -505,18 +523,17 @@ TEST_F(TxnParticipantTest, CommitTransactionSetsCommitTimestampOnPreparedTransac
const auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {});
const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
- auto originalFn = _opObserver->onTransactionCommitFn;
- _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) {
- originalFn(commitOplogEntryOpTime, commitTimestamp, statements);
- ASSERT(commitOplogEntryOpTime);
- ASSERT(commitTimestamp);
+ auto originalFn = _opObserver->onPreparedTransactionCommitFn;
+ _opObserver->onPreparedTransactionCommitFn =
+ [&](OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) {
+ originalFn(commitOplogEntryOpTime, commitTimestamp, statements);
- ASSERT_GT(*commitTimestamp, prepareTimestamp);
+ ASSERT_GT(commitTimestamp, prepareTimestamp);
- ASSERT(statements.empty());
- };
+ ASSERT(statements.empty());
+ };
txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {});
@@ -545,16 +562,13 @@ TEST_F(TxnParticipantTest, CommitTransactionWithCommitTimestampFailsOnUnprepared
TEST_F(TxnParticipantTest, CommitTransactionDoesNotSetCommitTimestampOnUnpreparedTransaction) {
auto sessionCheckout = checkOutSession();
- auto originalFn = _opObserver->onTransactionCommitFn;
- _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) {
- originalFn(commitOplogEntryOpTime, commitTimestamp, statements);
- ASSERT_FALSE(commitOplogEntryOpTime);
- ASSERT_FALSE(commitTimestamp);
- ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
- ASSERT(statements.empty());
- };
+ auto originalFn = _opObserver->onUnpreparedTransactionCommitFn;
+ _opObserver->onUnpreparedTransactionCommitFn =
+ [&](const std::vector<repl::ReplOperation>& statements) {
+ originalFn(statements);
+ ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
+ ASSERT(statements.empty());
+ };
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
@@ -738,7 +752,7 @@ TEST_F(TxnParticipantTest, StepDownAfterPrepareDoesNotBlockThenCommit) {
runFunctionFromDifferentOpCtx(func);
txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {});
- ASSERT(_opObserver->transactionCommitted);
+ ASSERT(_opObserver->preparedTransactionCommitted);
ASSERT(txnParticipant.transactionIsCommitted());
}
@@ -782,31 +796,17 @@ TEST_F(TxnParticipantTest, StepDownDuringPreparedCommitFails) {
ErrorCodes::NotMaster);
}
-DEATH_TEST_F(TxnParticipantTest,
- ThrowDuringPreparedOnTransactionCommitIsFatal,
- "Caught exception during commit") {
- auto sessionCheckout = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
-
- _opObserver->onTransactionCommitThrowsException = true;
- const auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {});
- const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
-
- txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {});
-}
-
TEST_F(TxnParticipantTest, ThrowDuringUnpreparedCommitLetsTheAbortAtEntryPointToCleanUp) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
- _opObserver->onTransactionCommitThrowsException = true;
+ _opObserver->onUnpreparedTransactionCommitThrowsException = true;
ASSERT_THROWS_CODE(txnParticipant.commitUnpreparedTransaction(opCtx()),
AssertionException,
ErrorCodes::OperationFailed);
- ASSERT_FALSE(_opObserver->transactionCommitted);
+ ASSERT_FALSE(_opObserver->unpreparedTransactionCommitted);
ASSERT_FALSE(txnParticipant.transactionIsAborted());
ASSERT_FALSE(txnParticipant.transactionIsCommitted());
@@ -983,13 +983,14 @@ DEATH_TEST_F(TxnParticipantTest,
auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp);
- _opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp,
- std::vector<repl::ReplOperation>& statements) {
- // Hit an invariant. This should never happen.
- txnParticipant.abortActiveTransaction(opCtx());
- ASSERT_FALSE(txnParticipant.transactionIsAborted());
- };
+ _opObserver->onPreparedTransactionCommitFn =
+ [&](OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) {
+ // Hit an invariant. This should never happen.
+ txnParticipant.abortActiveTransaction(opCtx());
+ ASSERT_FALSE(txnParticipant.transactionIsAborted());
+ };
txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {});
// Check that we removed the prepareTimestamp from the set.