diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/session.h | 8 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 135 | ||||
-rw-r--r-- | src/mongo/db/session_txn_record.idl | 13 |
7 files changed, 139 insertions, 64 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index ccf3bc6e138..9117ce32e6c 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -108,7 +108,8 @@ void onWriteOpCompleted(OperationContext* opCtx, Session* session, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, - Date_t lastStmtIdWriteDate) { + Date_t lastStmtIdWriteDate, + boost::optional<DurableTxnStateEnum> txnState) { if (lastStmtIdWriteOpTime.isNull()) return; @@ -117,7 +118,8 @@ void onWriteOpCompleted(OperationContext* opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteOpTime, - lastStmtIdWriteDate); + lastStmtIdWriteDate, + txnState); } } @@ -412,7 +414,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::back_inserter(stmtIdsWritten), [](const InsertStatement& stmt) { return stmt.stmtId; }); - onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate); + onWriteOpCompleted( + opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate, boost::none); } auto* const css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate) @@ -482,7 +485,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime, - opTime.wallClockTime); + opTime.wallClockTime, + boost::none); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -541,7 +545,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime, - opTime.wallClockTime); + opTime.wallClockTime, + boost::none); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -954,8 +959,9 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, auto times = replLogApplyOps( opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot); + auto txnState = prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted; onWriteOpCompleted( - opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime); + opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime, txnState); return times; } catch (const AssertionException& e) { // Change the error code to TransactionTooLarge if it is BSONObjectTooLarge. diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 3bf8a4bf967..2fb8003bf46 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -320,7 +320,8 @@ public: AutoGetCollection autoColl(opCtx, nss, MODE_IX); WriteUnitOfWork wuow(opCtx); auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp. - session->onWriteOpCompletedOnPrimary(opCtx, txnNum, {stmtId}, opTime, Date_t::now()); + session->onWriteOpCompletedOnPrimary( + opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none); wuow.commit(); } } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index ad8f5ee1848..5e7c7b57742 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -545,8 +545,9 @@ void fillWriterVectors(OperationContext* opCtx, } try { derivedOps->emplace_back(ApplyOps::extractOperations(op)); - fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, sessionUpdateTracker); + + // Nested entries cannot have different session updates. + fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); } catch (...) { fassertFailedWithStatusNoTrace( 50711, diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 217aa0bab62..b7478ce0ee3 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -285,7 +285,8 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, - Date_t lastStmtIdWriteDate) { + Date_t lastStmtIdWriteDate, + boost::optional<DurableTxnStateEnum> txnState) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); stdx::unique_lock<stdx::mutex> ul(_mutex); @@ -300,7 +301,7 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, } const auto updateRequest = - _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate); + _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState); ul.unlock(); @@ -354,8 +355,10 @@ void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx, const auto updatedLastStmtIdWriteDate = txnLastStmtIdWriteDate == Date_t::min() ? oplogLastStmtIdWriteDate : txnLastStmtIdWriteDate; - const auto updateRequest = - _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, updatedLastStmtIdWriteDate); + // We do not migrate transaction oplog entries. + auto txnState = boost::none; + const auto updateRequest = _makeUpdateRequest( + ul, txnNumber, lastStmtIdWriteOpTime, updatedLastStmtIdWriteDate, txnState); ul.unlock(); @@ -519,7 +522,8 @@ Date_t Session::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const { UpdateRequest Session::_makeUpdateRequest(WithLock, TxnNumber newTxnNumber, const repl::OpTime& newLastWriteOpTime, - Date_t newLastWriteDate) const { + Date_t newLastWriteDate, + boost::optional<DurableTxnStateEnum> newState) const { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); const auto updateBSON = [&] { @@ -528,6 +532,7 @@ UpdateRequest Session::_makeUpdateRequest(WithLock, newTxnRecord.setTxnNum(newTxnNumber); newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); newTxnRecord.setLastWriteDate(newLastWriteDate); + newTxnRecord.setState(newState); return newTxnRecord.toBSON(); }(); updateRequest.setUpdates(updateBSON); @@ -632,6 +637,12 @@ boost::optional<repl::OplogEntry> Session::createMatchingTransactionTableUpdate( newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); newTxnRecord.setLastWriteOpTime(entry.getOpTime()); newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); + + if (entry.isCommand() && + entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) { + newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared + : DurableTxnStateEnum::kCommitted); + } return newTxnRecord.toBSON(); }(); diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 835d264f5a8..b617f954692 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -103,6 +103,8 @@ public: * in the write's WUOW. Updates the on-disk state of the session to match the specified * transaction/opTime and keeps the cached state in sync. * + * 'txnState' is 'none' for retryable writes. + * * Must only be called with the session checked-out. * * Throws if the session has been invalidated or the active transaction number doesn't match. @@ -111,7 +113,8 @@ public: TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, - Date_t lastStmtIdWriteDate); + Date_t lastStmtIdWriteDate, + boost::optional<DurableTxnStateEnum> txnState); /** * Helper function to begin a migration on a primary node. @@ -236,7 +239,8 @@ private: UpdateRequest _makeUpdateRequest(WithLock, TxnNumber newTxnNumber, const repl::OpTime& newLastWriteTs, - Date_t newLastWriteDate) const; + Date_t newLastWriteDate, + boost::optional<DurableTxnStateEnum> newState) const; void _registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index e32c60f9d1b..1b8845011c3 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -182,6 +182,52 @@ protected: OplogSlot()); } + repl::OpTime writeTxnRecord(Session* session, + TxnNumber txnNum, + StmtId stmtId, + repl::OpTime prevOpTime, + boost::optional<DurableTxnStateEnum> txnState) { + session->beginOrContinueTxn(opCtx(), txnNum); + + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); + WriteUnitOfWork wuow(opCtx()); + const auto opTime = + logOp(opCtx(), kNss, session->getSessionId(), txnNum, stmtId, prevOpTime); + session->onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState); + wuow.commit(); + + return opTime; + } + + void assertTxnRecord(Session* session, + TxnNumber txnNum, + StmtId stmtId, + repl::OpTime opTime, + boost::optional<DurableTxnStateEnum> txnState) { + DBDirectClient client(opCtx()); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON("_id" << session->getSessionId().toBSON())}); + ASSERT(cursor); + ASSERT(cursor->more()); + + auto txnRecordObj = cursor->next(); + auto txnRecord = SessionTxnRecord::parse( + IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), txnRecordObj); + ASSERT(!cursor->more()); + ASSERT_EQ(session->getSessionId(), txnRecord.getSessionId()); + ASSERT_EQ(txnNum, txnRecord.getTxnNum()); + ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); + ASSERT(txnRecord.getState() == txnState); + ASSERT_EQ(txnState != boost::none, + txnRecordObj.hasField(SessionTxnRecord::kStateFieldName)); + ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum)); + + session->invalidate(); + session->refreshFromStorageIfNeeded(opCtx()); + ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum)); + } + OpObserverMock* _opObserver = nullptr; }; @@ -211,15 +257,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { const TxnNumber txnNum = 21; session.beginOrContinueTxn(opCtx(), txnNum); - const auto opTime = [&] { - AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); - WriteUnitOfWork wuow(opCtx()); - const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); - wuow.commit(); - - return opTime; - }(); + const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -233,6 +271,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { ASSERT_EQ(sessionId, txnRecord.getSessionId()); ASSERT_EQ(txnNum, txnRecord.getTxnNum()); ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); + ASSERT(!txnRecord.getState()); ASSERT_EQ(opTime, session.getLastWriteOpTime(txnNum)); } @@ -241,20 +280,8 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); - const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, repl::OpTime prevOpTime) { - session.beginOrContinueTxn(opCtx(), txnNum); - - AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); - WriteUnitOfWork wuow(opCtx()); - const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); - wuow.commit(); - - return opTime; - }; - - const auto firstOpTime = writeTxnRecordFn(100, 0, {}); - const auto secondOpTime = writeTxnRecordFn(200, 1, firstOpTime); + const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); + const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -268,6 +295,7 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { ASSERT_EQ(sessionId, txnRecord.getSessionId()); ASSERT_EQ(200, txnRecord.getTxnNum()); ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime()); + ASSERT(!txnRecord.getState()); ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); session.invalidate(); @@ -275,6 +303,23 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); } +TEST_F(SessionTest, TransactionTableUpdatesReplaceEntireDocument) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); + assertTxnRecord(&session, 100, 0, firstOpTime, boost::none); + const auto secondOpTime = + writeTxnRecord(&session, 200, 1, firstOpTime, DurableTxnStateEnum::kPrepared); + assertTxnRecord(&session, 200, 1, secondOpTime, DurableTxnStateEnum::kPrepared); + const auto thirdOpTime = + writeTxnRecord(&session, 300, 2, secondOpTime, DurableTxnStateEnum::kCommitted); + assertTxnRecord(&session, 300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted); + const auto fourthOpTime = writeTxnRecord(&session, 400, 3, thirdOpTime, boost::none); + assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none); +} + TEST_F(SessionTest, StartingOldTxnShouldAssert) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); @@ -306,7 +351,8 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), + ASSERT_THROWS(session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none), AssertionException); } @@ -318,25 +364,15 @@ TEST_F(SessionTest, CheckStatementExecuted) { const TxnNumber txnNum = 100; session.beginOrContinueTxn(opCtx(), txnNum); - const auto writeTxnRecordFn = [&](StmtId stmtId, repl::OpTime prevOpTime) { - AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); - WriteUnitOfWork wuow(opCtx()); - const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); - wuow.commit(); - - return opTime; - }; - ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); - const auto firstOpTime = writeTxnRecordFn(1000, {}); + const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); - writeTxnRecordFn(2000, firstOpTime); + writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); @@ -386,7 +422,8 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); + session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); wuow.commit(); } @@ -394,10 +431,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0); - ASSERT_THROWS_CODE( - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime, Date_t::now()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } } @@ -415,10 +452,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { session.invalidate(); - ASSERT_THROWS_CODE( - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { @@ -433,7 +470,8 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); + session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); session.invalidate(); @@ -543,7 +581,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { {}, false /* prepare */, OplogSlot()); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime, wallClockTime); + session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none); wuow.commit(); return opTime; @@ -573,7 +612,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { OplogSlot()); session.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime); + opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime, boost::none); wuow.commit(); } diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl index 0bd043860db..da56da3d24e 100644 --- a/src/mongo/db/session_txn_record.idl +++ b/src/mongo/db/session_txn_record.idl @@ -37,6 +37,15 @@ imports: - "mongo/db/logical_session_id.idl" - "mongo/db/repl/replication_types.idl" +enums: + DurableTxnState: + description: "The state of the most recent durable transaction on a session" + type: string + values: + kPrepared: "prepared" + kCommitted: "committed" + kAborted: "aborted" + structs: sessionTxnRecord: description: "A document used for storing session transaction states." @@ -57,3 +66,7 @@ structs: type: date description: "Wall clock time of the last write which happened on on this transaction." + state: + type: DurableTxnState + optional: true # Retryable writes do not have a state field. + description: "The state of the most recent durable transaction on the session" |