diff options
author | Judah Schvimer <judah@mongodb.com> | 2019-06-19 14:04:09 -0400 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2019-06-25 14:53:45 -0400 |
commit | aad978f94cc92f2bb22a9ed922a71a4ad7adfb93 (patch) | |
tree | 1da9401e1f3de25791a90d733e336db907c4c789 | |
parent | 4dba23de42733d08df9829e93e9d726fdf72cd59 (diff) | |
download | mongo-aad978f94cc92f2bb22a9ed922a71a4ad7adfb93.tar.gz |
SERVER-36529 OpObservers for session operations should pass SessionTxnRecord
(cherry picked from commit 58f209ca5ebe15ffbaec95b26f5b6a7b957f6510)
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 72 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 15 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_retryable_writes_test.cpp | 81 |
6 files changed, 122 insertions, 117 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 597030d47c4..a2abcc5ca26 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -114,26 +114,19 @@ repl::OpTime logOperation(OperationContext* opCtx, * oplog entry, the recursion will stop at this point. */ void onWriteOpCompleted(OperationContext* opCtx, - const NamespaceString& nss, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime, - Date_t lastStmtIdWriteDate, - boost::optional<DurableTxnStateEnum> txnState, - boost::optional<repl::OpTime> startOpTime) { - if (lastStmtIdWriteOpTime.isNull()) + SessionTxnRecord sessionTxnRecord) { + if (sessionTxnRecord.getLastWriteOpTime().isNull()) return; auto txnParticipant = TransactionParticipant::get(opCtx); if (!txnParticipant) return; - txnParticipant.onWriteOpCompletedOnPrimary(opCtx, - *opCtx->getTxnNumber(), - std::move(stmtIdsWritten), - lastStmtIdWriteOpTime, - lastStmtIdWriteDate, - txnState, - startOpTime); + // We add these here since they may not exist if we return early. + sessionTxnRecord.setSessionId(*opCtx->getLogicalSessionId()); + sessionTxnRecord.setTxnNum(*opCtx->getTxnNumber()); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx, std::move(stmtIdsWritten), sessionTxnRecord); } /** @@ -505,13 +498,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::back_inserter(stmtIdsWritten), [](const InsertStatement& stmt) { return stmt.stmtId; }); - onWriteOpCompleted(opCtx, - nss, - stmtIdsWritten, - lastOpTime, - lastWriteDate, - boost::none, - boost::none /* startOpTime */); + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setLastWriteOpTime(lastOpTime); + sessionTxnRecord.setLastWriteDate(lastWriteDate); + onWriteOpCompleted(opCtx, stmtIdsWritten, sessionTxnRecord); } size_t index = 0; @@ -568,13 +558,10 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg txnParticipant.addTransactionOperation(opCtx, operation); } else { opTime = replLogUpdate(opCtx, args); - onWriteOpCompleted(opCtx, - args.nss, - std::vector<StmtId>{args.updateArgs.stmtId}, - opTime.writeOpTime, - opTime.wallClockTime, - boost::none, - boost::none /* startOpTime */); + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime); + sessionTxnRecord.setLastWriteDate(opTime.wallClockTime); + onWriteOpCompleted(opCtx, std::vector<StmtId>{args.updateArgs.stmtId}, sessionTxnRecord); } if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) { @@ -632,13 +619,10 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, txnParticipant.addTransactionOperation(opCtx, operation); } else { opTime = replLogDelete(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc); - onWriteOpCompleted(opCtx, - nss, - std::vector<StmtId>{stmtId}, - opTime.writeOpTime, - opTime.wallClockTime, - boost::none, - boost::none /* startOpTime */); + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime); + sessionTxnRecord.setLastWriteDate(opTime.wallClockTime); + onWriteOpCompleted(opCtx, std::vector<StmtId>{stmtId}, sessionTxnRecord); } if (nss != NamespaceString::kSessionTransactionsTableNamespace) { @@ -1066,8 +1050,12 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, }(); if (updateTxnTable) { - onWriteOpCompleted( - opCtx, cmdNss, {}, times.writeOpTime, times.wallClockTime, txnState, startOpTime); + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setLastWriteOpTime(times.writeOpTime); + sessionTxnRecord.setLastWriteDate(times.wallClockTime); + sessionTxnRecord.setState(txnState); + sessionTxnRecord.setStartOpTime(startOpTime); + onWriteOpCompleted(opCtx, {}, sessionTxnRecord); } return times; } catch (const AssertionException& e) { @@ -1261,13 +1249,11 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, oplogSlot); invariant(oplogSlot.isNull() || oplogSlot == oplogOpTime); - onWriteOpCompleted(opCtx, - cmdNss, - {}, - oplogOpTime, - wallClockTime, - durableState, - boost::none /* startOpTime */); + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setLastWriteOpTime(oplogOpTime); + sessionTxnRecord.setLastWriteDate(wallClockTime); + sessionTxnRecord.setState(durableState); + onWriteOpCompleted(opCtx, {}, sessionTxnRecord); wuow.commit(); }); } diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 3fd8661a42c..a9596e623ec 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -455,8 +455,12 @@ public: AutoGetCollection autoColl(opCtx, nss, MODE_IX); WriteUnitOfWork wuow(opCtx); auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp. - txnParticipant.onWriteOpCompletedOnPrimary( - opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none, boost::none); + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(*opCtx->getLogicalSessionId()); + sessionTxnRecord.setTxnNum(txnNum); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx, {stmtId}, sessionTxnRecord); wuow.commit(); } } diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 1a900129e2f..fd6a3086dd7 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -318,8 +318,13 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because the // next oplog will contain the real operation if (!result.isPrePostImage) { - txnParticipant.onMigrateCompletedOnPrimary( - opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime()); + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(result.sessionId); + sessionTxnRecord.setTxnNum(result.txnNum); + sessionTxnRecord.setLastWriteOpTime(oplogOpTime); + sessionTxnRecord.setLastWriteDate(*oplogEntry.getWallClockTime()); + // We do not migrate transaction oplog entries so don't set the txn state. + txnParticipant.onMigrateCompletedOnPrimary(opCtx, {stmtId}, sessionTxnRecord); } wunit.commit(); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index b6f36e7d7ab..72bbe909760 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1956,51 +1956,48 @@ void TransactionParticipant::Participant::refreshFromStorageIfNeeded(OperationCo void TransactionParticipant::Participant::onWriteOpCompletedOnPrimary( OperationContext* opCtx, - TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime, - Date_t lastStmtIdWriteDate, - boost::optional<DurableTxnStateEnum> txnState, - boost::optional<repl::OpTime> startOpTime) { + const SessionTxnRecord& sessionTxnRecord) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); - invariant(txnNumber == o().activeTxnNumber); + invariant(sessionTxnRecord.getSessionId() == _sessionId()); + invariant(sessionTxnRecord.getTxnNum() == o().activeTxnNumber); // Sanity check that we don't double-execute statements for (const auto stmtId : stmtIdsWritten) { const auto stmtOpTime = _checkStatementExecuted(stmtId); if (stmtOpTime) { - fassertOnRepeatedExecution( - _sessionId(), txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); + fassertOnRepeatedExecution(_sessionId(), + sessionTxnRecord.getTxnNum(), + stmtId, + *stmtOpTime, + sessionTxnRecord.getLastWriteOpTime()); } } - const auto updateRequest = - _makeUpdateRequest(lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState, startOpTime); + const auto updateRequest = _makeUpdateRequest(sessionTxnRecord); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); updateSessionEntry(opCtx, updateRequest); - _registerUpdateCacheOnCommit(opCtx, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); + _registerUpdateCacheOnCommit( + opCtx, std::move(stmtIdsWritten), sessionTxnRecord.getLastWriteOpTime()); } void TransactionParticipant::Participant::onMigrateCompletedOnPrimary( OperationContext* opCtx, - TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime, - Date_t oplogLastStmtIdWriteDate) { + const SessionTxnRecord& sessionTxnRecord) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); - invariant(txnNumber == o().activeTxnNumber); + invariant(sessionTxnRecord.getSessionId() == _sessionId()); + invariant(sessionTxnRecord.getTxnNum() == o().activeTxnNumber); - // We do not migrate transaction oplog entries so don't set the txn state - const auto txnState = boost::none; - const auto updateRequest = _makeUpdateRequest( - lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState, boost::none /* startOpTime */); + const auto updateRequest = _makeUpdateRequest(sessionTxnRecord); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); updateSessionEntry(opCtx, updateRequest); - _registerUpdateCacheOnCommit(opCtx, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); + _registerUpdateCacheOnCommit( + opCtx, std::move(stmtIdsWritten), sessionTxnRecord.getLastWriteOpTime()); } void TransactionParticipant::Participant::_invalidate(WithLock wl) { @@ -2114,23 +2111,10 @@ boost::optional<repl::OpTime> TransactionParticipant::Participant::_checkStateme } UpdateRequest TransactionParticipant::Participant::_makeUpdateRequest( - const repl::OpTime& newLastWriteOpTime, - Date_t newLastWriteDate, - boost::optional<DurableTxnStateEnum> newState, - boost::optional<repl::OpTime> startOpTime) const { + const SessionTxnRecord& sessionTxnRecord) const { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); - const auto updateBSON = [&] { - SessionTxnRecord newTxnRecord; - newTxnRecord.setSessionId(_sessionId()); - newTxnRecord.setTxnNum(o().activeTxnNumber); - newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); - newTxnRecord.setLastWriteDate(newLastWriteDate); - newTxnRecord.setState(newState); - newTxnRecord.setStartOpTime(startOpTime); - return newTxnRecord.toBSON(); - }(); - updateRequest.setUpdateModification(updateBSON); + updateRequest.setUpdateModification(sessionTxnRecord.toBSON()); updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId().toBSON())); updateRequest.setUpsert(true); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 2120c56c3cd..77f9b778c08 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -566,12 +566,8 @@ public: * match. */ void onWriteOpCompletedOnPrimary(OperationContext* opCtx, - TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime, - Date_t lastStmtIdWriteDate, - boost::optional<DurableTxnStateEnum> txnState, - boost::optional<repl::OpTime> startOpTime); + const SessionTxnRecord& sessionTxnRecord); /** * Called after an entry for the specified session and transaction has been written to the @@ -583,10 +579,8 @@ public: * the one specified. */ void onMigrateCompletedOnPrimary(OperationContext* opCtx, - TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime, - Date_t oplogLastStmtIdWriteDate); + const SessionTxnRecord& sessionTxnRecord); /** * Checks whether the given statementId for the specified transaction has already executed @@ -677,10 +671,7 @@ public: private: boost::optional<repl::OpTime> _checkStatementExecuted(StmtId stmtId) const; - UpdateRequest _makeUpdateRequest(const repl::OpTime& newLastWriteOpTime, - Date_t newLastWriteDate, - boost::optional<DurableTxnStateEnum> newState, - boost::optional<repl::OpTime> startOpTime) const; + UpdateRequest _makeUpdateRequest(const SessionTxnRecord& sessionTxnRecord) const; void _registerUpdateCacheOnCommit(OperationContext* opCtx, std::vector<StmtId> stmtIdsWritten, diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 897c883c254..c1644f39918 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -235,8 +235,14 @@ protected: WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, uuid, session->getSessionId(), txnNum, stmtId, prevOpTime); - txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState, boost::none); + + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(session->getSessionId()); + sessionTxnRecord.setTxnNum(txnNum); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + sessionTxnRecord.setState(txnState); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {stmtId}, sessionTxnRecord); wuow.commit(); return opTime; @@ -395,8 +401,12 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN const auto uuid = UUID::gen(); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); - ASSERT_THROWS(txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none), + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(sessionId); + sessionTxnRecord.setTxnNum(txnNum); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + ASSERT_THROWS(txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {0}, sessionTxnRecord), AssertionException); } @@ -440,7 +450,7 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest, DEATH_TEST_F(TransactionParticipantRetryableWritesTest, WriteOpCompletedOnPrimaryForOldTransactionInvariants, - "Invariant failure txnNumber == o().activeTxnNumber") { + "Invariant failure sessionTxnRecord.getTxnNum() == o().activeTxnNumber") { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.refreshFromStorageIfNeeded(opCtx()); @@ -454,8 +464,13 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest, AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); - txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none); + + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(sessionId); + sessionTxnRecord.setTxnNum(txnNum); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {0}, sessionTxnRecord); wuow.commit(); } @@ -463,14 +478,19 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest, AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum - 1, 0); - txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none, boost::none); + + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(sessionId); + sessionTxnRecord.setTxnNum(txnNum - 1); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {0}, sessionTxnRecord); } } DEATH_TEST_F(TransactionParticipantRetryableWritesTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionInvariants, - "Invariant failure txnNumber == o().activeTxnNumber") { + "Invariant failure sessionTxnRecord.getTxnNum() == o().activeTxnNumber") { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.refreshFromStorageIfNeeded(opCtx()); @@ -483,8 +503,13 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest, const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0); txnParticipant.invalidate(opCtx()); - txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none); + + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(*opCtx()->getLogicalSessionId()); + sessionTxnRecord.setTxnNum(txnNum); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {0}, sessionTxnRecord); } TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) { @@ -586,8 +611,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke 1, {}, OplogSlot()); - txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none, boost::none); + + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(sessionId); + sessionTxnRecord.setTxnNum(txnNum); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {1}, sessionTxnRecord); wuow.commit(); return opTime; @@ -615,13 +645,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke link, OplogSlot()); - txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), - txnNum, - {kIncompleteHistoryStmtId}, - opTime, - wallClockTime, - boost::none, - boost::none); + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(sessionId); + sessionTxnRecord.setTxnNum(txnNum); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + txnParticipant.onWriteOpCompletedOnPrimary( + opCtx(), {kIncompleteHistoryStmtId}, sessionTxnRecord); wuow.commit(); } @@ -677,8 +707,13 @@ TEST_F(ShardTxnParticipantRetryableWritesTest, AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); - txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none); + + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(sessionId); + sessionTxnRecord.setTxnNum(txnNum); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {0}, sessionTxnRecord); wuow.commit(); } |