summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2019-03-25 14:04:50 -0400
committerJason Chan <jason.chan@10gen.com>2019-03-25 14:18:15 -0400
commitf8bf360e86ff96a0636575af7d1cee8ae9f0c3f0 (patch)
treefca8003369076ab69c7be139e8d726538e78e944
parent35374f25e2bc3fca9b42958ffc98032cf31f53a1 (diff)
downloadmongo-f8bf360e86ff96a0636575af7d1cee8ae9f0c3f0.tar.gz
SERVER-39792 Update the txn table only on the first txn operation on primary
-rw-r--r--src/mongo/db/op_observer_impl.cpp82
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp34
-rw-r--r--src/mongo/db/transaction_participant.cpp21
-rw-r--r--src/mongo/db/transaction_participant.h6
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp21
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp6
6 files changed, 128 insertions, 42 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index d12bfe0548b..e05ebcec0f9 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -123,7 +123,8 @@ void onWriteOpCompleted(OperationContext* opCtx,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime,
Date_t lastStmtIdWriteDate,
- boost::optional<DurableTxnStateEnum> txnState) {
+ boost::optional<DurableTxnStateEnum> txnState,
+ boost::optional<repl::OpTime> startOpTime) {
if (lastStmtIdWriteOpTime.isNull())
return;
@@ -136,7 +137,8 @@ void onWriteOpCompleted(OperationContext* opCtx,
std::move(stmtIdsWritten),
lastStmtIdWriteOpTime,
lastStmtIdWriteDate,
- txnState);
+ txnState,
+ startOpTime);
}
/**
@@ -527,7 +529,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::back_inserter(stmtIdsWritten),
[](const InsertStatement& stmt) { return stmt.stmtId; });
- onWriteOpCompleted(opCtx, nss, stmtIdsWritten, lastOpTime, lastWriteDate, boost::none);
+ onWriteOpCompleted(opCtx,
+ nss,
+ stmtIdsWritten,
+ lastOpTime,
+ lastWriteDate,
+ boost::none,
+ boost::none /* startOpTime */);
}
size_t index = 0;
@@ -589,7 +597,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
std::vector<StmtId>{args.updateArgs.stmtId},
opTime.writeOpTime,
opTime.wallClockTime,
- boost::none);
+ boost::none,
+ boost::none /* startOpTime */);
}
if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) {
@@ -650,7 +659,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
std::vector<StmtId>{stmtId},
opTime.writeOpTime,
opTime.wallClockTime,
- boost::none);
+ boost::none,
+ boost::none /* startOpTime */);
}
if (nss != NamespaceString::kSessionTransactionsTableNamespace) {
@@ -1045,8 +1055,13 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted;
}();
- onWriteOpCompleted(
- opCtx, cmdNss, {stmtId}, times.writeOpTime, times.wallClockTime, txnState);
+ onWriteOpCompleted(opCtx,
+ cmdNss,
+ {stmtId},
+ times.writeOpTime,
+ times.wallClockTime,
+ txnState,
+ boost::none /* startOpTime */);
return times;
} catch (const AssertionException& e) {
// Change the error code to TransactionTooLarge if it is BSONObjectTooLarge.
@@ -1114,17 +1129,21 @@ void logOplogEntriesForTransaction(OperationContext* opCtx,
stmtId = 0;
const NamespaceString cmdNss{"admin", "$cmd"};
auto oplogSlot = oplogSlots.begin();
+ const auto startOpTime = oplogSlot->opTime;
for (const auto& stmt : stmts) {
+ bool isStartOfTxn = prevWriteOpTime.writeOpTime.isNull();
prevWriteOpTime = logReplOperationForTransaction(
opCtx, sessionInfo, prevWriteOpTime.writeOpTime, stmtId, stmt, *oplogSlot++);
- // This will update the transaction table for each oplog entry, so a read at any
- // given timestamp in the transaction table will return the correct state.
- onWriteOpCompleted(opCtx,
- cmdNss,
- {stmtId},
- prevWriteOpTime.writeOpTime,
- prevWriteOpTime.wallClockTime,
- DurableTxnStateEnum::kInProgress);
+ if (isStartOfTxn) {
+ // Update the transaction table only on the first transaction oplog entry.
+ onWriteOpCompleted(opCtx,
+ cmdNss,
+ {stmtId},
+ prevWriteOpTime.writeOpTime,
+ prevWriteOpTime.wallClockTime,
+ DurableTxnStateEnum::kInProgress,
+ startOpTime);
+ }
stmtId++;
}
wuow.commit();
@@ -1183,7 +1202,13 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
oplogSlot);
invariant(oplogSlot.opTime.isNull() || oplogSlot.opTime == oplogOpTime);
- onWriteOpCompleted(opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, durableState);
+ onWriteOpCompleted(opCtx,
+ cmdNss,
+ {stmtId},
+ oplogOpTime,
+ wallClockTime,
+ durableState,
+ boost::none /* startOpTime */);
wuow.commit();
});
}
@@ -1223,8 +1248,13 @@ repl::OpTime logCommitForUnpreparedTransaction(OperationContext* opCtx,
// is not enforced at this level.
invariant(oplogSlot.opTime.isNull() || oplogSlot.opTime == oplogOpTime);
- onWriteOpCompleted(
- opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, DurableTxnStateEnum::kCommitted);
+ onWriteOpCompleted(opCtx,
+ cmdNss,
+ {stmtId},
+ oplogOpTime,
+ wallClockTime,
+ DurableTxnStateEnum::kCommitted,
+ boost::none /* startOpTime */);
return oplogSlot.opTime;
}
@@ -1287,7 +1317,8 @@ void OpObserverImpl::onPreparedTransactionCommit(
repl::OpTime logPrepareTransaction(OperationContext* opCtx,
StmtId stmtId,
const repl::OpTime& prevOpTime,
- const OplogSlot oplogSlot) {
+ const OplogSlot oplogSlot,
+ const repl::OpTime& startOpTime) {
const NamespaceString cmdNss{"admin", "$cmd"};
const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
@@ -1315,8 +1346,13 @@ repl::OpTime logPrepareTransaction(OperationContext* opCtx,
oplogSlot);
invariant(oplogSlot.opTime == oplogOpTime);
- onWriteOpCompleted(
- opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, DurableTxnStateEnum::kPrepared);
+ onWriteOpCompleted(opCtx,
+ cmdNss,
+ {stmtId},
+ oplogOpTime,
+ wallClockTime,
+ DurableTxnStateEnum::kPrepared,
+ startOpTime /* startOpTime */);
return oplogSlot.opTime;
}
@@ -1374,8 +1410,10 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
// The prevOpTime is the OpTime of the second last entry in the reserved slots.
prevOpTime = reservedSlots.rbegin()[1].opTime;
}
+ auto startTxnSlot = reservedSlots.front();
+ const auto startOpTime = startTxnSlot.opTime;
logPrepareTransaction(
- opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime);
+ opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime, startOpTime);
wuow.commit();
});
}
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 3bc545ebba6..89eb3e49f08 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -460,7 +460,7 @@ public:
WriteUnitOfWork wuow(opCtx);
auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp.
txnParticipant.onWriteOpCompletedOnPrimary(
- opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none);
+ opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none, boost::none);
wuow.commit();
}
}
@@ -630,6 +630,26 @@ protected:
ASSERT(!cursor->more());
}
+ void assertTxnRecordStartOpTime(boost::optional<repl::OpTime> startOpTime) {
+ DBDirectClient client(opCtx());
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
+ {BSON("_id" << session()->getSessionId().toBSON())});
+ ASSERT(cursor);
+ ASSERT(cursor->more());
+
+ auto txnRecordObj = cursor->next();
+ auto txnRecord =
+ SessionTxnRecord::parse(IDLParserErrorContext("SessionEntryWritten"), txnRecordObj);
+ ASSERT(!cursor->more());
+ ASSERT_EQ(session()->getSessionId(), txnRecord.getSessionId());
+ if (!startOpTime) {
+ ASSERT(!txnRecord.getStartOpTime());
+ } else {
+ ASSERT(txnRecord.getStartOpTime());
+ ASSERT_EQ(*startOpTime, *txnRecord.getStartOpTime());
+ }
+ }
+
Session* session() {
return OperationContextSession::get(opCtx());
}
@@ -1567,11 +1587,14 @@ TEST_F(OpObserverMultiEntryTransactionTest,
auto prepareEntryObj = oplogEntryObjs.back();
const auto prepareOplogEntry = assertGet(OplogEntry::parse(prepareEntryObj));
checkSessionAndTransactionFields(prepareEntryObj, 0);
+ // The startOpTime should refer to the prepare oplog entry for an empty transaction.
+ const auto startOpTime = prepareOplogEntry.getOpTime();
ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), prepareOplogEntry.getObject());
txnParticipant.stashTransactionResources(opCtx());
assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecordStartOpTime(startOpTime);
txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
@@ -1853,6 +1876,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ const auto startOpTime = insertEntry.getOpTime();
+
const auto prepareTimestamp = prepareOpTime.getTimestamp();
const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1]));
@@ -1865,6 +1890,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
txnParticipant.stashTransactionResources(opCtx());
assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecordStartOpTime(startOpTime);
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
// Mimic committing the transaction.
@@ -1893,6 +1919,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
ASSERT_BSONOBJ_EQ(oExpected, o);
assertTxnRecord(txnNum(), commitSlot.opTime, DurableTxnStateEnum::kCommitted);
+ // startTimestamp should no longer be set once the transaction has been committed.
+ assertTxnRecordStartOpTime(boost::none);
}
TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
@@ -1923,6 +1951,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ const auto startOpTime = insertEntry.getOpTime();
const auto prepareTimestamp = prepareOpTime.getTimestamp();
@@ -1936,6 +1965,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
txnParticipant.stashTransactionResources(opCtx());
assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecordStartOpTime(startOpTime);
txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
// Mimic aborting the transaction by resetting the WUOW.
@@ -1955,6 +1985,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
ASSERT_BSONOBJ_EQ(oExpected, o);
assertTxnRecord(txnNum(), abortSlot.opTime, DurableTxnStateEnum::kAborted);
+ // startOpTime should no longer be set once a transaction has been aborted.
+ assertTxnRecordStartOpTime(boost::none);
}
} // namespace
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 16cec7c5e7a..773b9d03ccd 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -1865,7 +1865,8 @@ void TransactionParticipant::Participant::onWriteOpCompletedOnPrimary(
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime,
Date_t lastStmtIdWriteDate,
- boost::optional<DurableTxnStateEnum> txnState) {
+ boost::optional<DurableTxnStateEnum> txnState,
+ boost::optional<repl::OpTime> startOpTime) {
invariant(opCtx->lockState()->inAWriteUnitOfWork());
invariant(txnNumber == o().activeTxnNumber);
@@ -1879,7 +1880,7 @@ void TransactionParticipant::Participant::onWriteOpCompletedOnPrimary(
}
const auto updateRequest =
- _makeUpdateRequest(lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState);
+ _makeUpdateRequest(lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState, startOpTime);
repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
@@ -1898,8 +1899,8 @@ void TransactionParticipant::Participant::onMigrateCompletedOnPrimary(
// We do not migrate transaction oplog entries so don't set the txn state
const auto txnState = boost::none;
- const auto updateRequest =
- _makeUpdateRequest(lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState);
+ const auto updateRequest = _makeUpdateRequest(
+ lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState, boost::none /* startOpTime */);
repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
@@ -2027,7 +2028,8 @@ boost::optional<repl::OpTime> TransactionParticipant::Participant::_checkStateme
UpdateRequest TransactionParticipant::Participant::_makeUpdateRequest(
const repl::OpTime& newLastWriteOpTime,
Date_t newLastWriteDate,
- boost::optional<DurableTxnStateEnum> newState) const {
+ boost::optional<DurableTxnStateEnum> newState,
+ boost::optional<repl::OpTime> startOpTime) const {
UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
const auto updateBSON = [&] {
@@ -2037,10 +2039,15 @@ UpdateRequest TransactionParticipant::Participant::_makeUpdateRequest(
newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
newTxnRecord.setLastWriteDate(newLastWriteDate);
newTxnRecord.setState(newState);
- if (newState == DurableTxnStateEnum::kPrepared) {
+ if (gUseMultipleOplogEntryFormatForTransactions && startOpTime) {
+ // The startOpTime should only be set when transitioning the txn to in-progress or
+ // prepared.
+ invariant(newState == DurableTxnStateEnum::kInProgress ||
+ newState == DurableTxnStateEnum::kPrepared);
+ newTxnRecord.setStartOpTime(*startOpTime);
+ } else if (newState == DurableTxnStateEnum::kPrepared) {
newTxnRecord.setStartOpTime(o().prepareOpTime);
}
-
return newTxnRecord.toBSON();
}();
updateRequest.setUpdates(updateBSON);
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 2cbb0aaaf7d..974e0e2fa6b 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -535,7 +535,8 @@ public:
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime,
Date_t lastStmtIdWriteDate,
- boost::optional<DurableTxnStateEnum> txnState);
+ boost::optional<DurableTxnStateEnum> txnState,
+ boost::optional<repl::OpTime> startOpTime);
/**
* Called after an entry for the specified session and transaction has been written to the
@@ -655,7 +656,8 @@ public:
UpdateRequest _makeUpdateRequest(const repl::OpTime& newLastWriteOpTime,
Date_t newLastWriteDate,
- boost::optional<DurableTxnStateEnum> newState) const;
+ boost::optional<DurableTxnStateEnum> newState,
+ boost::optional<repl::OpTime> startOpTime) const;
void _registerUpdateCacheOnCommit(OperationContext* opCtx,
std::vector<StmtId> stmtIdsWritten,
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index 6f51d4ddf7c..84a9da1c91f 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -238,7 +238,7 @@ protected:
const auto opTime =
logOp(opCtx(), kNss, uuid, session->getSessionId(), txnNum, stmtId, prevOpTime);
txnParticipant.onWriteOpCompletedOnPrimary(
- opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState);
+ opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState, boost::none);
wuow.commit();
return opTime;
@@ -398,7 +398,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN
const auto uuid = UUID::gen();
const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
ASSERT_THROWS(txnParticipant.onWriteOpCompletedOnPrimary(
- opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none),
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none),
AssertionException);
}
@@ -457,7 +457,7 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest,
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
txnParticipant.onWriteOpCompletedOnPrimary(
- opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none);
wuow.commit();
}
@@ -466,7 +466,7 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest,
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum - 1, 0);
txnParticipant.onWriteOpCompletedOnPrimary(
- opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none);
+ opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none, boost::none);
}
}
@@ -486,7 +486,7 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest,
txnParticipant.invalidate(opCtx());
txnParticipant.onWriteOpCompletedOnPrimary(
- opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none);
}
TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) {
@@ -591,7 +591,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke
false /* inTxn */,
OplogSlot());
txnParticipant.onWriteOpCompletedOnPrimary(
- opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none);
+ opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none, boost::none);
wuow.commit();
return opTime;
@@ -621,8 +621,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke
false /* inTxn */,
OplogSlot());
- txnParticipant.onWriteOpCompletedOnPrimary(
- opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime, boost::none);
+ txnParticipant.onWriteOpCompletedOnPrimary(opCtx(),
+ txnNum,
+ {kIncompleteHistoryStmtId},
+ opTime,
+ wallClockTime,
+ boost::none,
+ boost::none);
wuow.commit();
}
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index c4e2e35c856..95afd85a963 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -2914,7 +2914,8 @@ public:
sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "inProgress");
- ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), secondOplogEntryTs);
+ // The transaction table is only updated at the start of the transaction.
+ ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs);
sessionInfo = getSessionTxnInfoAtTimestamp(commitEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "committed");
@@ -3091,7 +3092,8 @@ public:
sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "inProgress");
- ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), secondOplogEntryTs);
+ // The transaction table is only updated at the start of the transaction.
+ ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs);
sessionInfo = getSessionTxnInfoAtTimestamp(prepareEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "prepared");