summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2019-01-17 14:31:00 -0500
committerBlake Oler <blake.oler@mongodb.com>2019-02-01 14:16:52 -0500
commitb8bfe9ff391ebeb10a5c2fb86979d854d17d0fd5 (patch)
tree86f50503bc3e5e7c51ff2cb0ed42338ad73c6e39
parent8387824b3ac937b0489fcb94c590cc663b47348c (diff)
downloadmongo-b8bfe9ff391ebeb10a5c2fb86979d854d17d0fd5.tar.gz
SERVER-39017 Allow prepared transaction statements to persist in-memory until commit
-rw-r--r--src/mongo/db/auth/auth_op_observer.h7
-rw-r--r--src/mongo/db/catalog/uuid_catalog.h7
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h7
-rw-r--r--src/mongo/db/op_observer.h11
-rw-r--r--src/mongo/db/op_observer_impl.cpp22
-rw-r--r--src/mongo/db/op_observer_impl.h7
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp73
-rw-r--r--src/mongo/db/op_observer_noop.h7
-rw-r--r--src/mongo/db/op_observer_registry.h11
-rw-r--r--src/mongo/db/repl/do_txn_test.cpp6
-rw-r--r--src/mongo/db/s/config_server_op_observer.h7
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h7
-rw-r--r--src/mongo/db/transaction_participant.cpp43
-rw-r--r--src/mongo/db/transaction_participant.h22
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp12
-rw-r--r--src/mongo/db/transaction_participant_test.cpp76
16 files changed, 239 insertions, 86 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index 93fd54cc7de..918dfaf10ed 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -159,9 +159,12 @@ public:
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) final {}
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) final {}
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) final {}
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h
index e7db04647fe..8d828e74953 100644
--- a/src/mongo/db/catalog/uuid_catalog.h
+++ b/src/mongo/db/catalog/uuid_catalog.h
@@ -155,8 +155,11 @@ public:
OptionalCollectionUUID uuid) override {}
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) override {}
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {}
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) override {}
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
void onReplicationRollback(OperationContext* opCtx,
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index d11fbf00099..3cd4ba4caa0 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -160,9 +160,12 @@ public:
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) final {}
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) final {}
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) final {}
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 682a6f0311d..b0f187adb4f 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -287,18 +287,25 @@ public:
* If the transaction was prepared, then 'commitOplogEntryOpTime' is passed in to be used as the
* OpTime of the oplog entry. The 'commitTimestamp' is the timestamp at which the multi-document
* transaction was committed. Either these fields should both be 'none' or neither should.
+ *
+ * The 'statements' are the list of CRUD operations to be applied in this transaction.
*/
virtual void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) = 0;
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) = 0;
/**
* The onTransactionPrepare method is called when an atomic transaction is prepared. It must be
* called when a transaction is active.
*
* The 'prepareOpTime' is passed in to be used as the OpTime of the oplog entry.
+ *
+ * The 'statements' are the list of CRUD operations to be applied in this transaction.
*/
- virtual void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) = 0;
+ virtual void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) = 0;
/**
* The onTransactionAbort method is called when an atomic transaction aborts, before the
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 05820684dbc..1d189adc3f9 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1085,16 +1085,14 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
void OpObserverImpl::onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) {
invariant(opCtx->getTxnNumber());
if (!opCtx->writesAreReplicated()) {
return;
}
- const auto txnParticipant = TransactionParticipant::get(opCtx);
- invariant(txnParticipant);
-
if (commitOplogEntryOpTime) {
invariant(commitTimestamp);
invariant(!commitTimestamp->isNull());
@@ -1105,26 +1103,24 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx,
opCtx, *commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted);
} else {
invariant(!commitTimestamp);
- const auto stmts = txnParticipant->endTransactionAndRetrieveOperations(opCtx);
// It is possible that the transaction resulted in no changes. In that case, we should
// not write an empty applyOps entry.
- if (stmts.empty())
+ if (statements.empty())
return;
- const auto commitOpTime = logApplyOpsForTransaction(opCtx, stmts, OplogSlot()).writeOpTime;
+ const auto commitOpTime =
+ logApplyOpsForTransaction(opCtx, statements, OplogSlot()).writeOpTime;
invariant(!commitOpTime.isNull());
}
}
-void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) {
+void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) {
invariant(opCtx->getTxnNumber());
- const auto txnParticipant = TransactionParticipant::get(opCtx);
- invariant(txnParticipant);
- invariant(txnParticipant->inMultiDocumentTransaction());
invariant(!prepareOpTime.opTime.isNull());
- auto stmts = txnParticipant->endTransactionAndRetrieveOperations(opCtx);
// Don't write oplog entry on secondaries.
if (!opCtx->writesAreReplicated()) {
@@ -1145,7 +1141,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const OplogSl
Lock::GlobalLock globalLock(opCtx, MODE_IX);
WriteUnitOfWork wuow(opCtx);
- logApplyOpsForTransaction(opCtx, stmts, prepareOpTime);
+ logApplyOpsForTransaction(opCtx, statements, prepareOpTime);
wuow.commit();
});
}
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 86ced3d6f81..2d09eaa710b 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -140,8 +140,11 @@ public:
OptionalCollectionUUID uuid);
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) final;
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) final;
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) final;
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statments) final;
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final;
void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final;
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index fff623af6ea..8d05bed6ffa 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -684,7 +684,11 @@ TEST_F(OpObserverLargeTransactionTest, TransactionTooLargeWhileCommitting) {
<< BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral)));
txnParticipant->addTransactionOperation(opCtx(), operation);
txnParticipant->addTransactionOperation(opCtx(), operation);
- ASSERT_THROWS_CODE(opObserver().onTransactionCommit(opCtx(), boost::none, boost::none),
+ ASSERT_THROWS_CODE(opObserver().onTransactionCommit(
+ opCtx(),
+ boost::none,
+ boost::none,
+ txnParticipant->retrieveCompletedTransactionOperations(opCtx())),
AssertionException,
ErrorCodes::TransactionTooLarge);
}
@@ -730,7 +734,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
{
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
- opObserver().onTransactionPrepare(opCtx(), slot);
+ opObserver().onTransactionPrepare(
+ opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
}
@@ -805,7 +810,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
txnParticipant->transitionToPreparedforTest();
const auto prepareSlot = repl::getNextOpTime(opCtx());
prepareTimestamp = prepareSlot.opTime.getTimestamp();
- opObserver().onTransactionPrepare(opCtx(), prepareSlot);
+ opObserver().onTransactionPrepare(
+ opCtx(), prepareSlot, txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
commitSlot = repl::getNextOpTime(opCtx());
}
@@ -813,7 +819,13 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
// Mimic committing the transaction.
opCtx()->setWriteUnitOfWork(nullptr);
opCtx()->lockState()->unsetMaxLockTimeout();
- opObserver().onTransactionCommit(opCtx(), commitSlot, prepareTimestamp);
+
+ txnParticipant->transitionToCommittingWithPrepareforTest();
+ opObserver().onTransactionCommit(
+ opCtx(),
+ commitSlot,
+ prepareTimestamp,
+ txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
repl::OplogInterfaceLocal oplogInterface(opCtx(), NamespaceString::kRsOplogNamespace.ns());
auto oplogIter = oplogInterface.makeIterator();
@@ -869,7 +881,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) {
txnParticipant->transitionToPreparedforTest();
const auto prepareSlot = repl::getNextOpTime(opCtx());
- opObserver().onTransactionPrepare(opCtx(), prepareSlot);
+ opObserver().onTransactionPrepare(
+ opCtx(), prepareSlot, txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
abortSlot = repl::getNextOpTime(opCtx());
}
@@ -947,7 +960,8 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) {
{
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
- opObserver().onTransactionPrepare(opCtx(), slot);
+ opObserver().onTransactionPrepare(
+ opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
}
@@ -972,7 +986,8 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable)
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
prepareOpTime = slot.opTime;
- opObserver().onTransactionPrepare(opCtx(), slot);
+ opObserver().onTransactionPrepare(
+ opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
}
@@ -1003,7 +1018,8 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction
{
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
- opObserver().onTransactionPrepare(opCtx(), slot);
+ opObserver().onTransactionPrepare(
+ opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
txnParticipant->transitionToPreparedforTest();
abortSlot = repl::getNextOpTime(opCtx());
@@ -1039,7 +1055,11 @@ TEST_F(OpObserverTransactionTest, CommittingUnpreparedNonEmptyTransactionWritesT
opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false);
}
- opObserver().onTransactionCommit(opCtx(), boost::none, boost::none);
+ opObserver().onTransactionCommit(
+ opCtx(),
+ boost::none,
+ boost::none,
+ txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
opCtx()->getWriteUnitOfWork()->commit();
assertTxnRecord(txnNum(), {}, DurableTxnStateEnum::kCommitted);
@@ -1050,7 +1070,11 @@ TEST_F(OpObserverTransactionTest,
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
- opObserver().onTransactionCommit(opCtx(), boost::none, boost::none);
+ opObserver().onTransactionCommit(
+ opCtx(),
+ boost::none,
+ boost::none,
+ txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
txnParticipant->stashTransactionResources(opCtx());
@@ -1069,7 +1093,8 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
prepareOpTime = slot.opTime;
- opObserver().onTransactionPrepare(opCtx(), slot);
+ opObserver().onTransactionPrepare(
+ opCtx(), slot, txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
txnParticipant->transitionToPreparedforTest();
}
@@ -1081,7 +1106,13 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
// Mimic committing the transaction.
opCtx()->setWriteUnitOfWork(nullptr);
opCtx()->lockState()->unsetMaxLockTimeout();
- opObserver().onTransactionCommit(opCtx(), commitSlot, prepareOpTime.getTimestamp());
+
+ txnParticipant->transitionToCommittingWithPrepareforTest();
+ opObserver().onTransactionCommit(
+ opCtx(),
+ commitSlot,
+ prepareOpTime.getTimestamp(),
+ txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
assertTxnRecord(txnNum(), commitOpTime, DurableTxnStateEnum::kCommitted);
}
@@ -1113,7 +1144,11 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) {
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
- opObserver().onTransactionCommit(opCtx(), boost::none, boost::none);
+ opObserver().onTransactionCommit(
+ opCtx(),
+ boost::none,
+ boost::none,
+ txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
auto oplogEntryObj = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntryObj);
OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
@@ -1190,7 +1225,11 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
- opObserver().onTransactionCommit(opCtx(), boost::none, boost::none);
+ opObserver().onTransactionCommit(
+ opCtx(),
+ boost::none,
+ boost::none,
+ txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
auto oplogEntry = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntry);
auto o = oplogEntry.getObjectField("o");
@@ -1243,7 +1282,11 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) {
BSON("_id" << 1 << "data"
<< "y"));
opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none);
- opObserver().onTransactionCommit(opCtx(), boost::none, boost::none);
+ opObserver().onTransactionCommit(
+ opCtx(),
+ boost::none,
+ boost::none,
+ txnParticipant->retrieveCompletedTransactionOperations(opCtx()));
auto oplogEntry = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntry);
auto o = oplogEntry.getObjectField("o");
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index 3a1ad19e848..f3e6d5e3a60 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -139,8 +139,11 @@ public:
OptionalCollectionUUID uuid) override {}
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) override{};
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override{};
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) override{};
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) override{};
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override{};
void onReplicationRollback(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index e3cd4b642d7..5e4a69996ac 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -261,16 +261,19 @@ public:
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) override {
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) override {
ReservedTimes times{opCtx};
for (auto& o : _observers)
- o->onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp);
+ o->onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp, statements);
}
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) override {
ReservedTimes times{opCtx};
for (auto& observer : _observers) {
- observer->onTransactionPrepare(opCtx, prepareOpTime);
+ observer->onTransactionPrepare(opCtx, prepareOpTime, statements);
}
}
diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp
index 2e089afebbd..4c573a9c857 100644
--- a/src/mongo/db/repl/do_txn_test.cpp
+++ b/src/mongo/db/repl/do_txn_test.cpp
@@ -63,7 +63,8 @@ public:
*/
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) override;
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) override;
// If present, holds the applyOps oplog entry written out by the ObObserverImpl
// onTransactionCommit.
@@ -72,7 +73,8 @@ public:
void OpObserverMock::onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) {
ASSERT(!commitOplogEntryOpTime) << commitOplogEntryOpTime->opTime;
ASSERT(!commitTimestamp) << *commitTimestamp;
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index be9b1b54c56..476e724518d 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -160,9 +160,12 @@ public:
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) override {}
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) override {}
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {}
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 6459ba2747a..d0fcf00b79b 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -161,9 +161,12 @@ public:
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) override {}
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) override {}
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {}
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 7a5d0e812e8..8f710669788 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -928,7 +928,8 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx,
// We need to unlock the session to run the opObserver onTransactionPrepare, which calls back
// into the session.
lk.unlock();
- opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot);
+ opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(
+ opCtx, prepareOplogSlot, retrieveCompletedTransactionOperations(opCtx));
abortGuard.dismiss();
@@ -992,7 +993,7 @@ void TransactionParticipant::addTransactionOperation(OperationContext* opCtx,
_transactionOperationBytes <= BSONObjMaxInternalSize);
}
-std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrieveOperations(
+std::vector<repl::ReplOperation>& TransactionParticipant::retrieveCompletedTransactionOperations(
OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -1000,13 +1001,32 @@ std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrie
// and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
- // Ensure that we only ever end a transaction when prepared or in progress.
- invariant(_txnState.isInSet(lk, TransactionState::kPrepared | TransactionState::kInProgress),
+ // Ensure that we only ever retrieve a transaction's completed operations when in progress,
+ // committing with prepare, or prepared.
+ invariant(_txnState.isInSet(lk,
+ TransactionState::kInProgress |
+ TransactionState::kCommittingWithPrepare |
+ TransactionState::kPrepared),
+ str::stream() << "Current state: " << _txnState);
+
+ return _transactionOperations;
+}
+
+void TransactionParticipant::clearOperationsInMemory(OperationContext* opCtx) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // Always check session's txnNumber and '_txnState', since they can be modified by session kill
+ // and migration, which do not check out the session.
+ _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
+
+ // Ensure that we only ever end a transaction when committing with prepare or in progress.
+ invariant(_txnState.isInSet(
+ lk, TransactionState::kCommittingWithPrepare | TransactionState::kInProgress),
str::stream() << "Current state: " << _txnState);
invariant(_autoCommit);
_transactionOperationBytes = 0;
- return std::move(_transactionOperations);
+ _transactionOperations.clear();
}
void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx) {
@@ -1028,7 +1048,11 @@ void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx
lk.unlock();
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
- opObserver->onTransactionCommit(opCtx, boost::none, boost::none);
+ opObserver->onTransactionCommit(
+ opCtx, boost::none, boost::none, retrieveCompletedTransactionOperations(opCtx));
+
+ clearOperationsInMemory(opCtx);
+
lk.lock();
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
@@ -1108,9 +1132,14 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx,
{
// Once the transaction is committed, the oplog entry must be written.
UninterruptibleLockGuard lockGuard(opCtx->lockState());
- opObserver->onTransactionCommit(opCtx, commitOplogSlot, commitTimestamp);
+ opObserver->onTransactionCommit(opCtx,
+ commitOplogSlot,
+ commitTimestamp,
+ retrieveCompletedTransactionOperations(opCtx));
}
+ clearOperationsInMemory(opCtx);
+
lk.lock();
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 350bd96e953..5eee0afb03a 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -303,11 +303,19 @@ public:
void addTransactionOperation(OperationContext* opCtx, const repl::ReplOperation& operation);
/**
- * Returns and clears the stored operations for an multi-document (non-autocommit) transaction,
- * and marks the transaction as closed. It is illegal to attempt to add operations to the
- * transaction after this is called.
+ * Returns a reference to the stored operations for a completed multi-document (non-autocommit)
+ * transaction. "Completed" implies that no more operations will be added to the transaction.
+ * It is legal to call this method only when the transaction state is in progress or committed.
*/
- std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations(OperationContext* opCtx);
+ std::vector<repl::ReplOperation>& retrieveCompletedTransactionOperations(
+ OperationContext* opCtx);
+
+ /**
+ * Clears the stored operations for an multi-document (non-autocommit) transaction, marking
+ * the transaction as closed. It is illegal to attempt to add operations to the transaction
+ * after this is called.
+ */
+ void clearOperationsInMemory(OperationContext* opCtx);
/**
* Yield or reacquire locks for prepared transacitons, used on replication state transition.
@@ -538,6 +546,12 @@ public:
_txnState.transitionTo(lk, TransactionState::kPrepared);
}
+ void transitionToCommittingWithPrepareforTest() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _txnState.transitionTo(lk, TransactionState::kCommittingWithPrepare);
+ }
+
+
void transitionToAbortedWithoutPrepareforTest() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_txnState.transitionTo(lk, TransactionState::kAbortedWithoutPrepare);
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index a65de737246..7535712469b 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -87,9 +87,11 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) override {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime);
+ OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements);
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
@@ -104,9 +106,11 @@ public:
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) override {
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) override {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp);
+ OpObserverNoop::onTransactionCommit(
+ opCtx, commitOplogEntryOpTime, commitTimestamp, statements);
uassert(ErrorCodes::OperationFailed,
"onTransactionCommit() failed",
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 70c55ab773e..4541995166d 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -90,7 +90,9 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override;
+ void onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) override;
bool onTransactionPrepareThrowsException = false;
bool transactionPrepared = false;
@@ -98,12 +100,15 @@ public:
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) override;
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) override;
bool onTransactionCommitThrowsException = false;
bool transactionCommitted = false;
- stdx::function<void(boost::optional<OplogSlot>, boost::optional<Timestamp>)>
+ stdx::function<void(
+ boost::optional<OplogSlot>, boost::optional<Timestamp>, std::vector<repl::ReplOperation>&)>
onTransactionCommitFn = [](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {};
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) {};
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override;
@@ -120,9 +125,11 @@ public:
const repl::OpTime dropOpTime = {Timestamp(Seconds(100), 1U), 1LL};
};
-void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) {
+void OpObserverMock::onTransactionPrepare(OperationContext* opCtx,
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statements) {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime);
+ OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements);
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
@@ -133,7 +140,8 @@ void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const OplogSl
void OpObserverMock::onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) {
if (commitOplogEntryOpTime) {
invariant(commitTimestamp);
ASSERT_FALSE(opCtx->lockState()->inAWriteUnitOfWork());
@@ -144,12 +152,12 @@ void OpObserverMock::onTransactionCommit(OperationContext* opCtx,
ASSERT(opCtx->lockState()->inAWriteUnitOfWork());
}
- OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp);
+ OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp, statements);
uassert(ErrorCodes::OperationFailed,
"onTransactionCommit() failed",
!onTransactionCommitThrowsException);
transactionCommitted = true;
- onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp);
+ onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp, statements);
}
void OpObserverMock::onTransactionAbort(OperationContext* opCtx,
@@ -511,12 +519,15 @@ TEST_F(TxnParticipantTest, CommitTransactionSetsCommitTimestampOnPreparedTransac
auto originalFn = _opObserver->onTransactionCommitFn;
_opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {
- originalFn(commitOplogEntryOpTime, commitTimestamp);
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) {
+ originalFn(commitOplogEntryOpTime, commitTimestamp, statements);
ASSERT(commitOplogEntryOpTime);
ASSERT(commitTimestamp);
ASSERT_GT(*commitTimestamp, prepareTimestamp);
+
+ ASSERT(statements.empty());
};
txnParticipant->commitPreparedTransaction(opCtx(), commitTS);
@@ -548,11 +559,13 @@ TEST_F(TxnParticipantTest, CommitTransactionDoesNotSetCommitTimestampOnUnprepare
auto originalFn = _opObserver->onTransactionCommitFn;
_opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {
- originalFn(commitOplogEntryOpTime, commitTimestamp);
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) {
+ originalFn(commitOplogEntryOpTime, commitTimestamp, statements);
ASSERT_FALSE(commitOplogEntryOpTime);
ASSERT_FALSE(commitTimestamp);
ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
+ ASSERT(statements.empty());
};
auto txnParticipant = TransactionParticipant::get(opCtx());
@@ -696,7 +709,21 @@ TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndAbort) {
ErrorCodes::NoSuchTransaction);
}
-TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) {
+TEST_F(TxnParticipantTest, ConcurrencyOfRetrieveCompletedTransactionOperationsAndAbort) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->unstashTransactionResources(opCtx(), "insert");
+
+ // The transaction may be aborted without checking out the txnParticipant.
+ txnParticipant->abortArbitraryTransaction();
+
+ // A retrieveCompletedTransactionOperations() after an abort should uassert.
+ ASSERT_THROWS_CODE(txnParticipant->retrieveCompletedTransactionOperations(opCtx()),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+}
+
+TEST_F(TxnParticipantTest, ConcurrencyOfClearOperationsInMemory) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
@@ -704,8 +731,8 @@ TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAb
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
- // An endTransactionAndRetrieveOperations() after an abort should uassert.
- ASSERT_THROWS_CODE(txnParticipant->endTransactionAndRetrieveOperations(opCtx()),
+ // An clearOperationsInMemory() after an abort should uassert.
+ ASSERT_THROWS_CODE(txnParticipant->clearOperationsInMemory(opCtx()),
AssertionException,
ErrorCodes::NoSuchTransaction);
}
@@ -867,13 +894,16 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPreparedCommitDoesNotAbortTransacti
auto originalFn = _opObserver->onTransactionCommitFn;
_opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {
- originalFn(commitOplogEntryOpTime, commitTimestamp);
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) {
+ originalFn(commitOplogEntryOpTime, commitTimestamp, statements);
ASSERT(commitOplogEntryOpTime);
ASSERT(commitTimestamp);
ASSERT_GT(*commitTimestamp, prepareTimestamp);
+ ASSERT(statements.empty());
+
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
ASSERT_FALSE(txnParticipant->transactionIsAborted());
@@ -984,13 +1014,16 @@ TEST_F(TxnParticipantTest, ArbitraryAbortDuringPreparedCommitDoesNotAbortTransac
auto originalFn = _opObserver->onTransactionCommitFn;
_opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {
- originalFn(commitOplogEntryOpTime, commitTimestamp);
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) {
+ originalFn(commitOplogEntryOpTime, commitTimestamp, statements);
ASSERT(commitOplogEntryOpTime);
ASSERT(commitTimestamp);
ASSERT_GT(*commitTimestamp, prepareTimestamp);
+ ASSERT(statements.empty());
+
// The transaction may be aborted without checking out the txnParticipant.
auto func = [&](OperationContext* opCtx) { txnParticipant->abortArbitraryTransaction(); };
runFunctionFromDifferentOpCtx(func);
@@ -1211,7 +1244,8 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti
auto sessionId = *opCtx()->getLogicalSessionId();
auto txnNum = *opCtx()->getTxnNumber();
_opObserver->onTransactionCommitFn = [&](boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {
+ boost::optional<Timestamp> commitTimestamp,
+ std::vector<repl::ReplOperation>& statements) {
// This should never happen.
auto func = [&](OperationContext* opCtx) {
opCtx->setLogicalSessionId(sessionId);