summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/op_observer_impl.cpp18
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp5
-rw-r--r--src/mongo/db/session.cpp21
-rw-r--r--src/mongo/db/session.h8
-rw-r--r--src/mongo/db/session_test.cpp135
-rw-r--r--src/mongo/db/session_txn_record.idl13
7 files changed, 139 insertions, 64 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index ccf3bc6e138..9117ce32e6c 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -108,7 +108,8 @@ void onWriteOpCompleted(OperationContext* opCtx,
Session* session,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime,
- Date_t lastStmtIdWriteDate) {
+ Date_t lastStmtIdWriteDate,
+ boost::optional<DurableTxnStateEnum> txnState) {
if (lastStmtIdWriteOpTime.isNull())
return;
@@ -117,7 +118,8 @@ void onWriteOpCompleted(OperationContext* opCtx,
*opCtx->getTxnNumber(),
std::move(stmtIdsWritten),
lastStmtIdWriteOpTime,
- lastStmtIdWriteDate);
+ lastStmtIdWriteDate,
+ txnState);
}
}
@@ -412,7 +414,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::back_inserter(stmtIdsWritten),
[](const InsertStatement& stmt) { return stmt.stmtId; });
- onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate);
+ onWriteOpCompleted(
+ opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate, boost::none);
}
auto* const css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate)
@@ -482,7 +485,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
session,
std::vector<StmtId>{args.stmtId},
opTime.writeOpTime,
- opTime.wallClockTime);
+ opTime.wallClockTime,
+ boost::none);
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -541,7 +545,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
session,
std::vector<StmtId>{stmtId},
opTime.writeOpTime,
- opTime.wallClockTime);
+ opTime.wallClockTime,
+ boost::none);
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -954,8 +959,9 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
auto times = replLogApplyOps(
opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot);
+ auto txnState = prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted;
onWriteOpCompleted(
- opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime);
+ opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime, txnState);
return times;
} catch (const AssertionException& e) {
// Change the error code to TransactionTooLarge if it is BSONObjectTooLarge.
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 3bf8a4bf967..2fb8003bf46 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -320,7 +320,8 @@ public:
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
WriteUnitOfWork wuow(opCtx);
auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp.
- session->onWriteOpCompletedOnPrimary(opCtx, txnNum, {stmtId}, opTime, Date_t::now());
+ session->onWriteOpCompletedOnPrimary(
+ opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none);
wuow.commit();
}
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index ad8f5ee1848..5e7c7b57742 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -545,8 +545,9 @@ void fillWriterVectors(OperationContext* opCtx,
}
try {
derivedOps->emplace_back(ApplyOps::extractOperations(op));
- fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, sessionUpdateTracker);
+
+ // Nested entries cannot have different session updates.
+ fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
} catch (...) {
fassertFailedWithStatusNoTrace(
50711,
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 217aa0bab62..b7478ce0ee3 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -285,7 +285,8 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
TxnNumber txnNumber,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime,
- Date_t lastStmtIdWriteDate) {
+ Date_t lastStmtIdWriteDate,
+ boost::optional<DurableTxnStateEnum> txnState) {
invariant(opCtx->lockState()->inAWriteUnitOfWork());
stdx::unique_lock<stdx::mutex> ul(_mutex);
@@ -300,7 +301,7 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
}
const auto updateRequest =
- _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate);
+ _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState);
ul.unlock();
@@ -354,8 +355,10 @@ void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx,
const auto updatedLastStmtIdWriteDate =
txnLastStmtIdWriteDate == Date_t::min() ? oplogLastStmtIdWriteDate : txnLastStmtIdWriteDate;
- const auto updateRequest =
- _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, updatedLastStmtIdWriteDate);
+ // We do not migrate transaction oplog entries.
+ auto txnState = boost::none;
+ const auto updateRequest = _makeUpdateRequest(
+ ul, txnNumber, lastStmtIdWriteOpTime, updatedLastStmtIdWriteDate, txnState);
ul.unlock();
@@ -519,7 +522,8 @@ Date_t Session::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const {
UpdateRequest Session::_makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
const repl::OpTime& newLastWriteOpTime,
- Date_t newLastWriteDate) const {
+ Date_t newLastWriteDate,
+ boost::optional<DurableTxnStateEnum> newState) const {
UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
const auto updateBSON = [&] {
@@ -528,6 +532,7 @@ UpdateRequest Session::_makeUpdateRequest(WithLock,
newTxnRecord.setTxnNum(newTxnNumber);
newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
newTxnRecord.setLastWriteDate(newLastWriteDate);
+ newTxnRecord.setState(newState);
return newTxnRecord.toBSON();
}();
updateRequest.setUpdates(updateBSON);
@@ -632,6 +637,12 @@ boost::optional<repl::OplogEntry> Session::createMatchingTransactionTableUpdate(
newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
newTxnRecord.setLastWriteOpTime(entry.getOpTime());
newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
+
+ if (entry.isCommand() &&
+ entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) {
+ newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared
+ : DurableTxnStateEnum::kCommitted);
+ }
return newTxnRecord.toBSON();
}();
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 835d264f5a8..b617f954692 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -103,6 +103,8 @@ public:
* in the write's WUOW. Updates the on-disk state of the session to match the specified
* transaction/opTime and keeps the cached state in sync.
*
+ * 'txnState' is 'none' for retryable writes.
+ *
* Must only be called with the session checked-out.
*
* Throws if the session has been invalidated or the active transaction number doesn't match.
@@ -111,7 +113,8 @@ public:
TxnNumber txnNumber,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime,
- Date_t lastStmtIdWriteDate);
+ Date_t lastStmtIdWriteDate,
+ boost::optional<DurableTxnStateEnum> txnState);
/**
* Helper function to begin a migration on a primary node.
@@ -236,7 +239,8 @@ private:
UpdateRequest _makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
const repl::OpTime& newLastWriteTs,
- Date_t newLastWriteDate) const;
+ Date_t newLastWriteDate,
+ boost::optional<DurableTxnStateEnum> newState) const;
void _registerUpdateCacheOnCommit(OperationContext* opCtx,
TxnNumber newTxnNumber,
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index e32c60f9d1b..1b8845011c3 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -182,6 +182,52 @@ protected:
OplogSlot());
}
+ repl::OpTime writeTxnRecord(Session* session,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ repl::OpTime prevOpTime,
+ boost::optional<DurableTxnStateEnum> txnState) {
+ session->beginOrContinueTxn(opCtx(), txnNum);
+
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ const auto opTime =
+ logOp(opCtx(), kNss, session->getSessionId(), txnNum, stmtId, prevOpTime);
+ session->onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState);
+ wuow.commit();
+
+ return opTime;
+ }
+
+ void assertTxnRecord(Session* session,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ repl::OpTime opTime,
+ boost::optional<DurableTxnStateEnum> txnState) {
+ DBDirectClient client(opCtx());
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON("_id" << session->getSessionId().toBSON())});
+ ASSERT(cursor);
+ ASSERT(cursor->more());
+
+ auto txnRecordObj = cursor->next();
+ auto txnRecord = SessionTxnRecord::parse(
+ IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), txnRecordObj);
+ ASSERT(!cursor->more());
+ ASSERT_EQ(session->getSessionId(), txnRecord.getSessionId());
+ ASSERT_EQ(txnNum, txnRecord.getTxnNum());
+ ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
+ ASSERT(txnRecord.getState() == txnState);
+ ASSERT_EQ(txnState != boost::none,
+ txnRecordObj.hasField(SessionTxnRecord::kStateFieldName));
+ ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum));
+
+ session->invalidate();
+ session->refreshFromStorageIfNeeded(opCtx());
+ ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum));
+ }
+
OpObserverMock* _opObserver = nullptr;
};
@@ -211,15 +257,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
const TxnNumber txnNum = 21;
session.beginOrContinueTxn(opCtx(), txnNum);
- const auto opTime = [&] {
- AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
- WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
- wuow.commit();
-
- return opTime;
- }();
+ const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -233,6 +271,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
ASSERT_EQ(sessionId, txnRecord.getSessionId());
ASSERT_EQ(txnNum, txnRecord.getTxnNum());
ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
+ ASSERT(!txnRecord.getState());
ASSERT_EQ(opTime, session.getLastWriteOpTime(txnNum));
}
@@ -241,20 +280,8 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
- const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, repl::OpTime prevOpTime) {
- session.beginOrContinueTxn(opCtx(), txnNum);
-
- AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
- WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now());
- wuow.commit();
-
- return opTime;
- };
-
- const auto firstOpTime = writeTxnRecordFn(100, 0, {});
- const auto secondOpTime = writeTxnRecordFn(200, 1, firstOpTime);
+ const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
+ const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -268,6 +295,7 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
ASSERT_EQ(sessionId, txnRecord.getSessionId());
ASSERT_EQ(200, txnRecord.getTxnNum());
ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime());
+ ASSERT(!txnRecord.getState());
ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200));
session.invalidate();
@@ -275,6 +303,23 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200));
}
+TEST_F(SessionTest, TransactionTableUpdatesReplaceEntireDocument) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
+ assertTxnRecord(&session, 100, 0, firstOpTime, boost::none);
+ const auto secondOpTime =
+ writeTxnRecord(&session, 200, 1, firstOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecord(&session, 200, 1, secondOpTime, DurableTxnStateEnum::kPrepared);
+ const auto thirdOpTime =
+ writeTxnRecord(&session, 300, 2, secondOpTime, DurableTxnStateEnum::kCommitted);
+ assertTxnRecord(&session, 300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted);
+ const auto fourthOpTime = writeTxnRecord(&session, 400, 3, thirdOpTime, boost::none);
+ assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none);
+}
+
TEST_F(SessionTest, StartingOldTxnShouldAssert) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
@@ -306,7 +351,8 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()),
+ ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none),
AssertionException);
}
@@ -318,25 +364,15 @@ TEST_F(SessionTest, CheckStatementExecuted) {
const TxnNumber txnNum = 100;
session.beginOrContinueTxn(opCtx(), txnNum);
- const auto writeTxnRecordFn = [&](StmtId stmtId, repl::OpTime prevOpTime) {
- AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
- WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now());
- wuow.commit();
-
- return opTime;
- };
-
ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
- const auto firstOpTime = writeTxnRecordFn(1000, {});
+ const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none);
ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000));
ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
- writeTxnRecordFn(2000, firstOpTime);
+ writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none);
ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000));
ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
@@ -386,7 +422,8 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
+ session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
wuow.commit();
}
@@ -394,10 +431,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0);
- ASSERT_THROWS_CODE(
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime, Date_t::now()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
}
@@ -415,10 +452,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
session.invalidate();
- ASSERT_THROWS_CODE(
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
@@ -433,7 +470,8 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
+ session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
session.invalidate();
@@ -543,7 +581,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
{},
false /* prepare */,
OplogSlot());
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime, wallClockTime);
+ session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none);
wuow.commit();
return opTime;
@@ -573,7 +612,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
OplogSlot());
session.onWriteOpCompletedOnPrimary(
- opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime);
+ opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime, boost::none);
wuow.commit();
}
diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl
index 0bd043860db..da56da3d24e 100644
--- a/src/mongo/db/session_txn_record.idl
+++ b/src/mongo/db/session_txn_record.idl
@@ -37,6 +37,15 @@ imports:
- "mongo/db/logical_session_id.idl"
- "mongo/db/repl/replication_types.idl"
+enums:
+ DurableTxnState:
+ description: "The state of the most recent durable transaction on a session"
+ type: string
+ values:
+ kPrepared: "prepared"
+ kCommitted: "committed"
+ kAborted: "aborted"
+
structs:
sessionTxnRecord:
description: "A document used for storing session transaction states."
@@ -57,3 +66,7 @@ structs:
type: date
description: "Wall clock time of the last write which happened on on this
transaction."
+ state:
+ type: DurableTxnState
+ optional: true # Retryable writes do not have a state field.
+ description: "The state of the most recent durable transaction on the session"