summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2023-02-06 10:17:24 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-06 16:59:56 +0000
commita7b6c12cd6975e17148a7effa7d964936619f0b5 (patch)
treea73a072c7d6c5e53177944d92b0968f5ce455365
parent39ddb8c2c82900ccd4452928d9fb0c8f22b35e3a (diff)
downloadmongo-a7b6c12cd6975e17148a7effa7d964936619f0b5.tar.gz
SERVER-72744: Introduce code path for a primary to commit a split prepared transaction.
-rw-r--r--src/mongo/db/repl/mock_repl_coord_server_fixture.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h10
-rw-r--r--src/mongo/db/transaction/SConscript4
-rw-r--r--src/mongo/db/transaction/transaction_participant.cpp85
-rw-r--r--src/mongo/db/transaction/transaction_participant.h14
-rw-r--r--src/mongo/db/transaction/transaction_participant_test.cpp341
7 files changed, 433 insertions, 33 deletions
diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.h b/src/mongo/db/repl/mock_repl_coord_server_fixture.h
index e0859800f63..6fa3f076851 100644
--- a/src/mongo/db/repl/mock_repl_coord_server_fixture.h
+++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.h
@@ -42,7 +42,7 @@ class StorageInterfaceMock;
} // namespace repl
/**
- * This is a basic fixture that is backed by an ephemeral storage engine and a mock replication
+ * This is a basic fixture that is backed by a real storage engine and a mock replication
* coordinator that is running as primary.
*/
class MockReplCoordServerFixture : public ServiceContextMongoDTest {
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index b31f3757586..b556ccd3ca3 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -244,11 +244,13 @@ void ReplicationCoordinatorMock::_setMyLastAppliedOpTimeAndWallTime(
_myLastAppliedOpTime = opTimeAndWallTime.opTime;
_myLastAppliedWallTime = opTimeAndWallTime.wallTime;
- _setCurrentCommittedSnapshotOpTime(lk, opTimeAndWallTime.opTime);
+ if (_updateCommittedSnapshot) {
+ _setCurrentCommittedSnapshotOpTime(lk, opTimeAndWallTime.opTime);
- if (auto storageEngine = _service->getStorageEngine()) {
- if (auto snapshotManager = storageEngine->getSnapshotManager()) {
- snapshotManager->setCommittedSnapshot(opTimeAndWallTime.opTime.getTimestamp());
+ if (auto storageEngine = _service->getStorageEngine()) {
+ if (auto snapshotManager = storageEngine->getSnapshotManager()) {
+ snapshotManager->setCommittedSnapshot(opTimeAndWallTime.opTime.getTimestamp());
+ }
}
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index a5ee67ef28f..ef43e450400 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -423,6 +423,15 @@ public:
virtual SplitPrepareSessionManager* getSplitPrepareSessionManager() override;
+ /**
+ * If this is true, the mock will update the "committed snapshot" everytime the "last applied"
+ * is updated. That behavior can be disabled for tests that need more control over what's
+ * majority committed.
+ */
+ void setUpdateCommittedSnapshot(bool val) {
+ _updateCommittedSnapshot = val;
+ }
+
private:
void _setMyLastAppliedOpTimeAndWallTime(WithLock lk,
const OpTimeAndWallTime& opTimeAndWallTime);
@@ -454,6 +463,7 @@ private:
bool _canAcceptNonLocalWrites = false;
SplitPrepareSessionManager _splitSessionManager;
+ bool _updateCommittedSnapshot = true;
};
} // namespace repl
diff --git a/src/mongo/db/transaction/SConscript b/src/mongo/db/transaction/SConscript
index ec8eadcd64a..210440097fc 100644
--- a/src/mongo/db/transaction/SConscript
+++ b/src/mongo/db/transaction/SConscript
@@ -100,7 +100,10 @@ env.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/op_observer/op_observer',
+ '$BUILD_DIR/mongo/db/op_observer/op_observer_impl',
+ '$BUILD_DIR/mongo/db/op_observer/oplog_writer_impl',
'$BUILD_DIR/mongo/db/repl/image_collection_entry',
'$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/db/repl/replica_set_aware_service',
@@ -110,6 +113,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/session/session_catalog',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/stats/transaction_stats',
+ '$BUILD_DIR/mongo/db/storage/storage_control',
'transaction',
'transaction_api',
'transaction_operations',
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp
index e615798495f..b408894fcff 100644
--- a/src/mongo/db/transaction/transaction_participant.cpp
+++ b/src/mongo/db/transaction/transaction_participant.cpp
@@ -1913,7 +1913,6 @@ void TransactionParticipant::Participant::commitPreparedTransaction(
// Once entering "committing with prepare" we cannot throw an exception.
// TODO (SERVER-71610): Fix to be interruptible or document exception.
UninterruptibleLockGuard noInterrupt(opCtx->lockState()); // NOLINT.
- opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp);
// On secondary, we generate a fake empty oplog slot, since it's not used by opObserver.
OplogSlot commitOplogSlot;
@@ -1946,13 +1945,28 @@ void TransactionParticipant::Participant::commitPreparedTransaction(
// entry.
invariant(!o().lastWriteOpTime.isNull());
- // If commitOplogEntryOpTime is a nullopt, then we grab the OpTime from the commitOplogSlot
- // which will only be set if we are primary. Otherwise, the commitOplogEntryOpTime must have
- // been passed in during secondary oplog application.
auto commitOplogSlotOpTime = commitOplogEntryOpTime.value_or(commitOplogSlot);
- opCtx->recoveryUnit()->setDurableTimestamp(commitOplogSlotOpTime.getTimestamp());
-
- _commitStorageTransaction(opCtx);
+ auto* splitPrepareManager =
+ repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager();
+ if (opCtx->writesAreReplicated() &&
+ splitPrepareManager->isSessionSplit(
+ _sessionId(), o().activeTxnNumberAndRetryCounter.getTxnNumber())) {
+ // If we are a primary committing a transaction that was split into smaller prepared
+ // transactions, use a special commit code path.
+ _commitSplitPreparedTxnOnPrimary(opCtx,
+ splitPrepareManager,
+ _sessionId(),
+ o().activeTxnNumberAndRetryCounter.getTxnNumber(),
+ commitTimestamp,
+ commitOplogSlot.getTimestamp());
+ } else {
+ // If commitOplogEntryOpTime is a nullopt, then we grab the OpTime from the
+ // commitOplogSlot which will only be set if we are primary. Otherwise, the
+ // commitOplogEntryOpTime must have been passed in during secondary oplog application.
+ opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp);
+ opCtx->recoveryUnit()->setDurableTimestamp(commitOplogSlotOpTime.getTimestamp());
+ _commitStorageTransaction(opCtx);
+ }
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
@@ -1998,13 +2012,68 @@ void TransactionParticipant::Participant::_commitStorageTransaction(OperationCon
opCtx->lockState()->unsetMaxLockTimeout();
}
+void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary(
+ OperationContext* parentOpCtx,
+ repl::SplitPrepareSessionManager* splitPrepareManager,
+ const LogicalSessionId& userSessionId,
+ const TxnNumber& userTxnNumber,
+ const Timestamp& commitTimestamp,
+ const Timestamp& durableTimestamp) {
+
+ for (const repl::PooledSession& session :
+ splitPrepareManager->getSplitSessions(userSessionId, userTxnNumber).get()) {
+
+ auto splitClientOwned = parentOpCtx->getServiceContext()->makeClient("tempSplitClient");
+ auto splitOpCtx = splitClientOwned->makeOperationContext();
+ AlternativeClientRegion acr(splitClientOwned);
+
+ std::unique_ptr<MongoDSessionCatalog::Session> checkedOutSession;
+
+ repl::UnreplicatedWritesBlock notReplicated(splitOpCtx.get());
+ splitOpCtx->setLogicalSessionId(session.getSessionId());
+ splitOpCtx->setTxnNumber(session.getTxnNumber());
+ splitOpCtx->setInMultiDocumentTransaction();
+
+ auto mongoDSessionCatalog = MongoDSessionCatalog::get(splitOpCtx.get());
+ checkedOutSession = mongoDSessionCatalog->checkOutSession(splitOpCtx.get());
+ TransactionParticipant::Participant newTxnParticipant =
+ TransactionParticipant::get(splitOpCtx.get());
+ newTxnParticipant.beginOrContinueTransactionUnconditionally(
+ splitOpCtx.get(), {*(splitOpCtx->getTxnNumber())});
+ newTxnParticipant.unstashTransactionResources(splitOpCtx.get(), "commitTransaction");
+
+ splitOpCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp);
+ splitOpCtx->recoveryUnit()->setDurableTimestamp(durableTimestamp);
+
+ {
+ Lock::GlobalLock rstl(splitOpCtx.get(), LockMode::MODE_IX);
+ newTxnParticipant._commitStorageTransaction(splitOpCtx.get());
+ auto operationCount = newTxnParticipant.p().transactionOperations.numOperations();
+ auto oplogOperationBytes =
+ newTxnParticipant.p().transactionOperations.getTotalOperationBytes();
+ newTxnParticipant.clearOperationsInMemory(splitOpCtx.get());
+
+ newTxnParticipant._finishCommitTransaction(
+ splitOpCtx.get(), operationCount, oplogOperationBytes);
+ }
+
+ newTxnParticipant.stashTransactionResources(splitOpCtx.get());
+ checkedOutSession->checkIn(splitOpCtx.get(), OperationContextSession::CheckInReason::kDone);
+ }
+
+ parentOpCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp);
+ parentOpCtx->recoveryUnit()->setDurableTimestamp(durableTimestamp);
+ this->_commitStorageTransaction(parentOpCtx);
+}
+
void TransactionParticipant::Participant::_finishCommitTransaction(
OperationContext* opCtx, size_t operationCount, size_t oplogOperationBytes) noexcept {
{
auto tickSource = opCtx->getServiceContext()->getTickSource();
stdx::lock_guard<Client> lk(*opCtx->getClient());
o(lk).txnState.transitionTo(TransactionState::kCommitted);
-
+ // Features such as the "split prepared transaction" optimization will not attribute
+ // transaction metrics to the internal sessions.
o(lk).transactionMetricsObserver.onCommit(opCtx,
ServerTransactionsMetrics::get(opCtx),
tickSource,
diff --git a/src/mongo/db/transaction/transaction_participant.h b/src/mongo/db/transaction/transaction_participant.h
index a945034c98f..152637db865 100644
--- a/src/mongo/db/transaction/transaction_participant.h
+++ b/src/mongo/db/transaction/transaction_participant.h
@@ -840,6 +840,20 @@ public:
// This should be called *without* the Client being locked.
void _commitStorageTransaction(OperationContext* opCtx);
+ // Commits a "split prepared" transaction. Prepared transactions processed on secondaries
+ // may split the storage writes into multiple RecoveryUnits. This method will be invoked by
+ // a primary such that it looks for all recovery units and commits them such that they
+ // become visible to snapshot/timestamped readers atomically.
+ //
+ // This must be called while an OplogSlot is being held open at or earlier than the input
+ // `durableTimestamp`.
+ void _commitSplitPreparedTxnOnPrimary(OperationContext* opCtx,
+ repl::SplitPrepareSessionManager* splitPrepareManager,
+ const LogicalSessionId& sessionId,
+ const TxnNumber& txnNumber,
+ const Timestamp& commitTimestamp,
+ const Timestamp& durableTimestamp);
+
// Stash transaction resources.
void _stashActiveTransaction(OperationContext* opCtx);
diff --git a/src/mongo/db/transaction/transaction_participant_test.cpp b/src/mongo/db/transaction/transaction_participant_test.cpp
index 6adb800b1a2..f38ed96ab36 100644
--- a/src/mongo/db/transaction/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction/transaction_participant_test.cpp
@@ -32,25 +32,33 @@
#include "mongo/db/concurrency/replication_state_transition_lock_guard.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/global_settings.h"
+#include "mongo/db/op_observer/op_observer_impl.h"
#include "mongo/db/op_observer/op_observer_noop.h"
#include "mongo/db/op_observer/op_observer_registry.h"
+#include "mongo/db/op_observer/oplog_writer_impl.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/mock_repl_coord_server_fixture.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session/session_catalog.h"
#include "mongo/db/session/session_catalog_mongod.h"
+#include "mongo/db/session/session_txn_record_gen.h"
#include "mongo/db/stats/fill_locker_info.h"
+#include "mongo/db/storage/durable_history_pin.h"
#include "mongo/db/transaction/server_transactions_metrics.h"
#include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h"
#include "mongo/db/transaction/transaction_participant.h"
#include "mongo/db/transaction/transaction_participant_gen.h"
#include "mongo/db/txn_retry_counter_too_old_info.h"
#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/logv2/log.h"
#include "mongo/stdx/future.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
@@ -61,6 +69,8 @@
#include "mongo/util/net/socket_utils.h"
#include "mongo/util/tick_source_mock.h"
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
namespace mongo {
namespace {
@@ -261,6 +271,10 @@ protected:
TxnParticipantTest(Options options = {}) : MockReplCoordServerFixture(std::move(options)) {}
void setUp() override {
+ repl::ReplSettings replSettings;
+ replSettings.setReplSetString("realReplicaSet");
+ setGlobalReplSettings(replSettings);
+
MockReplCoordServerFixture::setUp();
const auto service = opCtx()->getServiceContext();
@@ -348,34 +362,43 @@ protected:
return opCtxSession;
}
- void checkOutSessionFromDiferentOpCtx(const LogicalSessionId& lsid,
- bool beginOrContinueTxn,
- boost::optional<TxnNumber> txnNumber = boost::none,
- boost::optional<bool> autocommit = boost::none,
- boost::optional<bool> startTransaction = boost::none,
- bool commitTxn = false) {
+ void callUnderSplitSession(const InternalSessionPool::Session& session,
+ std::function<void(OperationContext* opCtx)> fn) {
runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
- opCtx->setLogicalSessionId(lsid);
- if (txnNumber) {
- opCtx->setTxnNumber(*txnNumber);
- opCtx->setInMultiDocumentTransaction();
- }
+ // Prepared writes as part of a split session must be done with an
+ // `UnreplicatedWritesBlock`. This is how we mimic oplog application.
+ repl::UnreplicatedWritesBlock notReplicated(opCtx);
+ opCtx->setLogicalSessionId(session.getSessionId());
+ opCtx->setTxnNumber(session.getTxnNumber());
+ opCtx->setInMultiDocumentTransaction();
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx);
- auto opCtxSession = mongoDSessionCatalog->checkOutSession(opCtx);
+ std::unique_ptr<MongoDSessionCatalog::Session> session =
+ mongoDSessionCatalog->checkOutSession(opCtx);
+ auto newTxnParticipant = TransactionParticipant::get(opCtx);
+ newTxnParticipant.beginOrContinueTransactionUnconditionally(opCtx,
+ {*(opCtx->getTxnNumber())});
+ newTxnParticipant.unstashTransactionResources(opCtx, "crud ops");
- if (beginOrContinueTxn) {
- auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(
- opCtx, {*opCtx->getTxnNumber()}, autocommit, startTransaction);
+ fn(opCtx);
- if (commitTxn) {
- txnParticipant.commitUnpreparedTransaction(opCtx);
- }
- }
+ newTxnParticipant.stashTransactionResources(opCtx);
+ session->checkIn(opCtx, OperationContextSession::CheckInReason::kDone);
});
}
+ void assertSessionState(BSONObj obj, DurableTxnStateEnum expectedState) {
+ SessionTxnRecord txnRecord =
+ SessionTxnRecord::parse(IDLParserContext("test sessn txn parser"), obj);
+ ASSERT_EQ(expectedState, txnRecord.getState().get());
+ }
+
+ void assertNotInSessionState(BSONObj obj, DurableTxnStateEnum expectedState) {
+ SessionTxnRecord txnRecord =
+ SessionTxnRecord::parse(IDLParserContext("test sessn txn parser"), obj);
+ ASSERT_NE(expectedState, txnRecord.getState().get());
+ }
+
const LogicalSessionId _sessionId{makeLogicalSessionIdForTest()};
const TxnNumber _txnNumber{20};
const UUID _uuid = UUID::gen();
@@ -5886,6 +5909,284 @@ TEST_F(TxnParticipantTest,
}
}
+/**
+ * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs.
+ */
+class OneOffRead {
+public:
+ OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) {
+ _opCtx->recoveryUnit()->abandonSnapshot();
+ if (ts.isNull()) {
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ } else {
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts);
+ }
+ }
+
+ ~OneOffRead() {
+ _opCtx->recoveryUnit()->abandonSnapshot();
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ }
+
+private:
+ OperationContext* _opCtx;
+};
+
+TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) {
+ // This test simulates:
+ // 1) Preparing a transaction with multiple logical sessions as a secondary.
+ // 2) Committing the transaction as a primary.
+ // 3) Rolling back the commit.
+ //
+ // This test asserts that:
+ // A) The writes done by both sessions are committed with the same "visible" timestamp.
+ // B) The writes done by both sessions rollback, due to having a "durable" timestamp that's
+ // ahead of the stable timestamp.
+ // C) The `config.transactions` table is not aware that the split sessions were part of a
+ // prepared transaction. One example of why this invariant is important is because
+ // reconstructing prepared transactions finds all `config.transactions` entries in the
+ // prepared state. We must ensure any prepared transaction is only reconstructed once.
+ //
+ // First we set up infrastructure such that we can simulate oplog application, primary behaviors
+ // and `rollbackToStable`.
+ OperationContext* opCtx = this->opCtx();
+ DurableHistoryRegistry::set(opCtx->getServiceContext(),
+ std::make_unique<DurableHistoryRegistry>());
+ opCtx->getServiceContext()->setOpObserver(
+ std::make_unique<OpObserverImpl>(std::make_unique<OplogWriterImpl>()));
+
+ OpDebug* const nullOpDbg = nullptr;
+
+ dynamic_cast<repl::ReplicationCoordinatorMock*>(repl::ReplicationCoordinator::get(opCtx))
+ ->setUpdateCommittedSnapshot(false);
+ // Initiate the term from 0 to 1 for familiarity.
+ ASSERT_OK(repl::ReplicationCoordinator::get(opCtx)->updateTerm(opCtx, 1));
+
+ // Bump the logical clock for easier visual cues.
+ const Timestamp startTs(100, 1);
+ auto oplogInfo = LocalOplogInfo::get(opCtx);
+ oplogInfo->setNewTimestamp(opCtx->getServiceContext(), startTs);
+ opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp(startTs);
+
+ // Assign the variables that represent the "real", client-facing logical session.
+ std::unique_ptr<MongoDSessionCatalog::Session> userSession = checkOutSession();
+ TransactionParticipant::Participant userTxnParticipant = TransactionParticipant::get(opCtx);
+
+ // TxnResources start in the "stashed" state.
+ userTxnParticipant.unstashTransactionResources(opCtx, "crud ops");
+
+ // Hold the collection lock/datastructure such that it can be released prior to rollback.
+ boost::optional<AutoGetCollection> userColl;
+ userColl.emplace(opCtx, kNss, LockMode::MODE_IX);
+
+ // We split our user session into 2 split sessions.
+ const int numSplits = 2;
+ auto* splitPrepareManager =
+ repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager();
+ const std::vector<InternalSessionPool::Session>& splitSessions =
+ splitPrepareManager->splitSession(
+ opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get(), numSplits);
+ // Insert an `_id: 1` document.
+ callUnderSplitSession(splitSessions[0], [nullOpDbg](OperationContext* opCtx) {
+ AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX);
+ ASSERT_OK(
+ collection_internal::insertDocument(opCtx,
+ userColl.getCollection(),
+ InsertStatement(BSON("_id" << 1 << "value" << 1)),
+ nullOpDbg));
+ });
+
+ // Insert an `_id: 2` document.
+ callUnderSplitSession(splitSessions[1], [nullOpDbg](OperationContext* opCtx) {
+ AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX);
+ ASSERT_OK(
+ collection_internal::insertDocument(opCtx,
+ userColl.getCollection(),
+ InsertStatement(BSON("_id" << 2 << "value" << 1)),
+ nullOpDbg));
+ });
+
+ // Update `2` to increment its `value` to 2. This must be done in the same split session as the
+ // insert.
+ callUnderSplitSession(splitSessions[1], [nullOpDbg](OperationContext* opCtx) {
+ AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX);
+ Helpers::update(
+ opCtx, userColl->ns(), BSON("_id" << 2), BSON("$inc" << BSON("value" << 1)));
+ });
+
+
+ // Mimic the methods to call for a secondary performing a split prepare. Those are called inside
+ // `UnreplicatedWritesBlock` and explicitly pass in the prepare OpTime.
+ const Timestamp prepTs = startTs;
+ const repl::OpTime prepOpTime(prepTs, 1);
+ callUnderSplitSession(splitSessions[0], [prepOpTime](OperationContext* opCtx) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.prepareTransaction(opCtx, prepOpTime);
+ });
+ callUnderSplitSession(splitSessions[1], [prepOpTime](OperationContext* opCtx) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.prepareTransaction(opCtx, prepOpTime);
+ });
+
+ // Normally this would also be called on a secondary under a `UnreplicatedWritesBlock`. However
+ // we must change the `config.transactions` state for this logical session. In production, that
+ // transaction table write would come via a synthetic oplog entry.
+ userTxnParticipant.prepareTransaction(opCtx, prepOpTime);
+
+ // Prepared transactions are dictated a commit/visible from the transaction coordinator. However
+ // the commit function will assign a "durable" timestamp derived from the system clock. This is
+ // the code path of a primary.
+ const Timestamp visibleTs = prepTs + 100;
+
+ // Committing a transaction as a primary does not allow direct control on the oplog/durable
+ // timestamp being allocated. We want to observe that we correctly set a durable timestamp when
+ // a primary commits a split prepare. Ultimately we'll observe that by seeing the transaction
+ // rollback on a call to `rollbackToStable`. We can coerce the durable timestamp by bumping the
+ // logical clock. Worth noting, the test is reading from the system clock which is expected to
+ // choose a value for the transaction's durable timestamp that is much much larger than
+ // `chosenStableTimestamp`. We only bump the clock here in the very unexpected case that the
+ // system clock is within few hundred seconds of the epoch.
+ const Timestamp chosenStableTimestamp = visibleTs + 10;
+ oplogInfo->setNewTimestamp(opCtx->getServiceContext(), chosenStableTimestamp + 1);
+ userTxnParticipant.commitPreparedTransaction(opCtx, visibleTs, boost::none);
+ ASSERT_LT(chosenStableTimestamp, userTxnParticipant.getLastWriteOpTime().getTimestamp());
+
+ // Using the `findOne` helpers by default invariants if no result is found.
+ const bool invariantOnError = true;
+ // Print out reads at the interesting times prior to asserting for diagnostics.
+ for (const auto& ts : std::vector<Timestamp>{prepTs, visibleTs}) {
+ OneOffRead oor(opCtx, ts);
+ LOGV2(7274401,
+ "Pre-rollback values",
+ "readTimestamp"_attr = ts,
+ "Doc(_id: 1)"_attr = Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError),
+ "Doc(_id: 2)"_attr = Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError));
+ }
+
+ // Reading at the prepare timestamp should not see anything.
+ {
+ OneOffRead oor(opCtx, prepTs);
+ opCtx->recoveryUnit()->setPrepareConflictBehavior(
+ PrepareConflictBehavior::kIgnoreConflicts);
+ ASSERT_BSONOBJ_EQ(
+ BSONObj::kEmptyObject,
+ Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError));
+ ASSERT_BSONOBJ_EQ(
+ BSONObj::kEmptyObject,
+ Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError));
+ }
+
+ // Reading at the visible/commit timestamp should see the inserted and updated documents.
+ {
+ OneOffRead oor(opCtx, visibleTs);
+ ASSERT_BSONOBJ_EQ(
+ BSON("_id" << 1 << "value" << 1),
+ Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError));
+ ASSERT_BSONOBJ_EQ(
+ BSON("_id" << 2 << "value" << 2),
+ Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError));
+ }
+
+ {
+ // We also assert that the user transaction record is in the committed state.
+ AutoGetCollection configTransactions(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace, LockMode::MODE_IS);
+ // The user config.transactions document must exist and must be in the "committed" state.
+ BSONObj userTxnObj =
+ Helpers::findOneForTesting(opCtx,
+ configTransactions.getCollection(),
+ BSON("_id.id" << opCtx->getLogicalSessionId()->getId()),
+ !invariantOnError);
+ ASSERT(!userTxnObj.isEmpty());
+ assertSessionState(userTxnObj, DurableTxnStateEnum::kCommitted);
+
+ // Rather than testing the implementation, we'll assert on the weakest necessary state. A
+ // split `config.transactions` document may or may not exist. If it exists, it must be
+ // in the "committed" state.
+ for (auto idx = 0; idx < numSplits; ++idx) {
+ BSONObj splitTxnObj = Helpers::findOneForTesting(
+ opCtx,
+ configTransactions.getCollection(),
+ BSON("_id.id" << splitSessions[idx].getSessionId().getId()),
+ !invariantOnError);
+ if (!splitTxnObj.isEmpty()) {
+ assertSessionState(splitTxnObj, DurableTxnStateEnum::kCommitted);
+ }
+ }
+ }
+
+ // Unlock the collection and check in the session to release locks.
+ userColl = boost::none;
+ userSession.reset();
+
+ // Rollback such that the commit oplog entry and the effects of the transaction are rolled
+ // back.
+ opCtx->getServiceContext()->getStorageEngine()->setStableTimestamp(chosenStableTimestamp);
+ {
+ Lock::GlobalLock globalLock(opCtx, LockMode::MODE_X);
+ ASSERT_OK(opCtx->getServiceContext()->getStorageEngine()->recoverToStableTimestamp(opCtx));
+ }
+
+ // Again, display read values for diagnostics.
+ userColl.emplace(opCtx, kNss, LockMode::MODE_IX);
+ for (const auto& ts : std::vector<Timestamp>{prepTs, visibleTs}) {
+ OneOffRead oor(opCtx, ts);
+ LOGV2(7274402,
+ "Post-rollback values",
+ "readTimestamp"_attr = ts,
+ "Doc(_id: 1)"_attr = Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError),
+ "Doc(_id: 2)"_attr = Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError));
+ }
+
+ // Now when we read at the commit/visible timestamp, the documents must not exist.
+ {
+ OneOffRead oor(opCtx, visibleTs);
+ ASSERT_BSONOBJ_EQ(
+ BSONObj::kEmptyObject,
+ Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError));
+ ASSERT_BSONOBJ_EQ(
+ BSONObj::kEmptyObject,
+ Helpers::findOneForTesting(
+ opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError));
+ }
+
+ // We also assert that the user transaction record is in the prepared state. The split sessions
+ // are not.
+ AutoGetCollection configTransactions(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace, LockMode::MODE_IS);
+ // The user `config.transactions` document must be in the "prepared" state.
+ BSONObj userTxnObj =
+ Helpers::findOneForTesting(opCtx,
+ configTransactions.getCollection(),
+ BSON("_id.id" << opCtx->getLogicalSessionId()->getId()),
+ !invariantOnError);
+ ASSERT(!userTxnObj.isEmpty());
+ assertSessionState(userTxnObj, DurableTxnStateEnum::kPrepared);
+
+ // Rather than testing the implementation, we'll assert on the weakest necessary state. A split
+ // `config.transactions` document may or may not exist. If it exists, it must not* be in the
+ // "prepared" state.
+ for (auto idx = 0; idx < numSplits; ++idx) {
+ BSONObj splitTxnObj =
+ Helpers::findOneForTesting(opCtx,
+ configTransactions.getCollection(),
+ BSON("_id.id" << splitSessions[idx].getSessionId().getId()),
+ !invariantOnError);
+ if (!splitTxnObj.isEmpty()) {
+ assertNotInSessionState(splitTxnObj, DurableTxnStateEnum::kPrepared);
+ }
+ }
+}
+
bool doesExistInCatalog(const LogicalSessionId& lsid, SessionCatalog* sessionCatalog) {
bool existsInCatalog{false};
sessionCatalog->scanSession(lsid,