diff options
author | Jason Chan <jason.chan@10gen.com> | 2019-03-25 14:04:50 -0400 |
---|---|---|
committer | Jason Chan <jason.chan@10gen.com> | 2019-03-25 14:18:15 -0400 |
commit | f8bf360e86ff96a0636575af7d1cee8ae9f0c3f0 (patch) | |
tree | fca8003369076ab69c7be139e8d726538e78e944 | |
parent | 35374f25e2bc3fca9b42958ffc98032cf31f53a1 (diff) | |
download | mongo-f8bf360e86ff96a0636575af7d1cee8ae9f0c3f0.tar.gz |
SERVER-39792 Update the txn table only on the first txn operation on primary
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 82 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 6 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_retryable_writes_test.cpp | 21 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 6 |
6 files changed, 128 insertions, 42 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index d12bfe0548b..e05ebcec0f9 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -123,7 +123,8 @@ void onWriteOpCompleted(OperationContext* opCtx, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate, - boost::optional<DurableTxnStateEnum> txnState) { + boost::optional<DurableTxnStateEnum> txnState, + boost::optional<repl::OpTime> startOpTime) { if (lastStmtIdWriteOpTime.isNull()) return; @@ -136,7 +137,8 @@ void onWriteOpCompleted(OperationContext* opCtx, std::move(stmtIdsWritten), lastStmtIdWriteOpTime, lastStmtIdWriteDate, - txnState); + txnState, + startOpTime); } /** @@ -527,7 +529,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::back_inserter(stmtIdsWritten), [](const InsertStatement& stmt) { return stmt.stmtId; }); - onWriteOpCompleted(opCtx, nss, stmtIdsWritten, lastOpTime, lastWriteDate, boost::none); + onWriteOpCompleted(opCtx, + nss, + stmtIdsWritten, + lastOpTime, + lastWriteDate, + boost::none, + boost::none /* startOpTime */); } size_t index = 0; @@ -589,7 +597,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg std::vector<StmtId>{args.updateArgs.stmtId}, opTime.writeOpTime, opTime.wallClockTime, - boost::none); + boost::none, + boost::none /* startOpTime */); } if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) { @@ -650,7 +659,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, std::vector<StmtId>{stmtId}, opTime.writeOpTime, opTime.wallClockTime, - boost::none); + boost::none, + boost::none /* startOpTime */); } if (nss != NamespaceString::kSessionTransactionsTableNamespace) { @@ -1045,8 +1055,13 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted; }(); - onWriteOpCompleted( - opCtx, cmdNss, {stmtId}, times.writeOpTime, times.wallClockTime, txnState); + onWriteOpCompleted(opCtx, + cmdNss, + {stmtId}, + times.writeOpTime, + times.wallClockTime, + txnState, + boost::none /* startOpTime */); return times; } catch (const AssertionException& e) { // Change the error code to TransactionTooLarge if it is BSONObjectTooLarge. @@ -1114,17 +1129,21 @@ void logOplogEntriesForTransaction(OperationContext* opCtx, stmtId = 0; const NamespaceString cmdNss{"admin", "$cmd"}; auto oplogSlot = oplogSlots.begin(); + const auto startOpTime = oplogSlot->opTime; for (const auto& stmt : stmts) { + bool isStartOfTxn = prevWriteOpTime.writeOpTime.isNull(); prevWriteOpTime = logReplOperationForTransaction( opCtx, sessionInfo, prevWriteOpTime.writeOpTime, stmtId, stmt, *oplogSlot++); - // This will update the transaction table for each oplog entry, so a read at any - // given timestamp in the transaction table will return the correct state. - onWriteOpCompleted(opCtx, - cmdNss, - {stmtId}, - prevWriteOpTime.writeOpTime, - prevWriteOpTime.wallClockTime, - DurableTxnStateEnum::kInProgress); + if (isStartOfTxn) { + // Update the transaction table only on the first transaction oplog entry. + onWriteOpCompleted(opCtx, + cmdNss, + {stmtId}, + prevWriteOpTime.writeOpTime, + prevWriteOpTime.wallClockTime, + DurableTxnStateEnum::kInProgress, + startOpTime); + } stmtId++; } wuow.commit(); @@ -1183,7 +1202,13 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, oplogSlot); invariant(oplogSlot.opTime.isNull() || oplogSlot.opTime == oplogOpTime); - onWriteOpCompleted(opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, durableState); + onWriteOpCompleted(opCtx, + cmdNss, + {stmtId}, + oplogOpTime, + wallClockTime, + durableState, + boost::none /* startOpTime */); wuow.commit(); }); } @@ -1223,8 +1248,13 @@ repl::OpTime logCommitForUnpreparedTransaction(OperationContext* opCtx, // is not enforced at this level. invariant(oplogSlot.opTime.isNull() || oplogSlot.opTime == oplogOpTime); - onWriteOpCompleted( - opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, DurableTxnStateEnum::kCommitted); + onWriteOpCompleted(opCtx, + cmdNss, + {stmtId}, + oplogOpTime, + wallClockTime, + DurableTxnStateEnum::kCommitted, + boost::none /* startOpTime */); return oplogSlot.opTime; } @@ -1287,7 +1317,8 @@ void OpObserverImpl::onPreparedTransactionCommit( repl::OpTime logPrepareTransaction(OperationContext* opCtx, StmtId stmtId, const repl::OpTime& prevOpTime, - const OplogSlot oplogSlot) { + const OplogSlot oplogSlot, + const repl::OpTime& startOpTime) { const NamespaceString cmdNss{"admin", "$cmd"}; const auto wallClockTime = getWallClockTimeForOpLog(opCtx); @@ -1315,8 +1346,13 @@ repl::OpTime logPrepareTransaction(OperationContext* opCtx, oplogSlot); invariant(oplogSlot.opTime == oplogOpTime); - onWriteOpCompleted( - opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, DurableTxnStateEnum::kPrepared); + onWriteOpCompleted(opCtx, + cmdNss, + {stmtId}, + oplogOpTime, + wallClockTime, + DurableTxnStateEnum::kPrepared, + startOpTime /* startOpTime */); return oplogSlot.opTime; } @@ -1374,8 +1410,10 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, // The prevOpTime is the OpTime of the second last entry in the reserved slots. prevOpTime = reservedSlots.rbegin()[1].opTime; } + auto startTxnSlot = reservedSlots.front(); + const auto startOpTime = startTxnSlot.opTime; logPrepareTransaction( - opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime); + opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime, startOpTime); wuow.commit(); }); } diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 3bc545ebba6..89eb3e49f08 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -460,7 +460,7 @@ public: WriteUnitOfWork wuow(opCtx); auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp. txnParticipant.onWriteOpCompletedOnPrimary( - opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none); + opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none, boost::none); wuow.commit(); } } @@ -630,6 +630,26 @@ protected: ASSERT(!cursor->more()); } + void assertTxnRecordStartOpTime(boost::optional<repl::OpTime> startOpTime) { + DBDirectClient client(opCtx()); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, + {BSON("_id" << session()->getSessionId().toBSON())}); + ASSERT(cursor); + ASSERT(cursor->more()); + + auto txnRecordObj = cursor->next(); + auto txnRecord = + SessionTxnRecord::parse(IDLParserErrorContext("SessionEntryWritten"), txnRecordObj); + ASSERT(!cursor->more()); + ASSERT_EQ(session()->getSessionId(), txnRecord.getSessionId()); + if (!startOpTime) { + ASSERT(!txnRecord.getStartOpTime()); + } else { + ASSERT(txnRecord.getStartOpTime()); + ASSERT_EQ(*startOpTime, *txnRecord.getStartOpTime()); + } + } + Session* session() { return OperationContextSession::get(opCtx()); } @@ -1567,11 +1587,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, auto prepareEntryObj = oplogEntryObjs.back(); const auto prepareOplogEntry = assertGet(OplogEntry::parse(prepareEntryObj)); checkSessionAndTransactionFields(prepareEntryObj, 0); + // The startOpTime should refer to the prepare oplog entry for an empty transaction. + const auto startOpTime = prepareOplogEntry.getOpTime(); ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), prepareOplogEntry.getObject()); txnParticipant.stashTransactionResources(opCtx()); assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + assertTxnRecordStartOpTime(startOpTime); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); @@ -1853,6 +1876,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert); + const auto startOpTime = insertEntry.getOpTime(); + const auto prepareTimestamp = prepareOpTime.getTimestamp(); const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1])); @@ -1865,6 +1890,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); txnParticipant.stashTransactionResources(opCtx()); assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + assertTxnRecordStartOpTime(startOpTime); txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); // Mimic committing the transaction. @@ -1893,6 +1919,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { ASSERT_BSONOBJ_EQ(oExpected, o); assertTxnRecord(txnNum(), commitSlot.opTime, DurableTxnStateEnum::kCommitted); + // startTimestamp should no longer be set once the transaction has been committed. + assertTxnRecordStartOpTime(boost::none); } TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { @@ -1923,6 +1951,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert); + const auto startOpTime = insertEntry.getOpTime(); const auto prepareTimestamp = prepareOpTime.getTimestamp(); @@ -1936,6 +1965,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); txnParticipant.stashTransactionResources(opCtx()); assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + assertTxnRecordStartOpTime(startOpTime); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); // Mimic aborting the transaction by resetting the WUOW. @@ -1955,6 +1985,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { ASSERT_BSONOBJ_EQ(oExpected, o); assertTxnRecord(txnNum(), abortSlot.opTime, DurableTxnStateEnum::kAborted); + // startOpTime should no longer be set once a transaction has been aborted. + assertTxnRecordStartOpTime(boost::none); } } // namespace diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 16cec7c5e7a..773b9d03ccd 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1865,7 +1865,8 @@ void TransactionParticipant::Participant::onWriteOpCompletedOnPrimary( std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate, - boost::optional<DurableTxnStateEnum> txnState) { + boost::optional<DurableTxnStateEnum> txnState, + boost::optional<repl::OpTime> startOpTime) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); invariant(txnNumber == o().activeTxnNumber); @@ -1879,7 +1880,7 @@ void TransactionParticipant::Participant::onWriteOpCompletedOnPrimary( } const auto updateRequest = - _makeUpdateRequest(lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState); + _makeUpdateRequest(lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState, startOpTime); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); @@ -1898,8 +1899,8 @@ void TransactionParticipant::Participant::onMigrateCompletedOnPrimary( // We do not migrate transaction oplog entries so don't set the txn state const auto txnState = boost::none; - const auto updateRequest = - _makeUpdateRequest(lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState); + const auto updateRequest = _makeUpdateRequest( + lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState, boost::none /* startOpTime */); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); @@ -2027,7 +2028,8 @@ boost::optional<repl::OpTime> TransactionParticipant::Participant::_checkStateme UpdateRequest TransactionParticipant::Participant::_makeUpdateRequest( const repl::OpTime& newLastWriteOpTime, Date_t newLastWriteDate, - boost::optional<DurableTxnStateEnum> newState) const { + boost::optional<DurableTxnStateEnum> newState, + boost::optional<repl::OpTime> startOpTime) const { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); const auto updateBSON = [&] { @@ -2037,10 +2039,15 @@ UpdateRequest TransactionParticipant::Participant::_makeUpdateRequest( newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); newTxnRecord.setLastWriteDate(newLastWriteDate); newTxnRecord.setState(newState); - if (newState == DurableTxnStateEnum::kPrepared) { + if (gUseMultipleOplogEntryFormatForTransactions && startOpTime) { + // The startOpTime should only be set when transitioning the txn to in-progress or + // prepared. + invariant(newState == DurableTxnStateEnum::kInProgress || + newState == DurableTxnStateEnum::kPrepared); + newTxnRecord.setStartOpTime(*startOpTime); + } else if (newState == DurableTxnStateEnum::kPrepared) { newTxnRecord.setStartOpTime(o().prepareOpTime); } - return newTxnRecord.toBSON(); }(); updateRequest.setUpdates(updateBSON); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 2cbb0aaaf7d..974e0e2fa6b 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -535,7 +535,8 @@ public: std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate, - boost::optional<DurableTxnStateEnum> txnState); + boost::optional<DurableTxnStateEnum> txnState, + boost::optional<repl::OpTime> startOpTime); /** * Called after an entry for the specified session and transaction has been written to the @@ -655,7 +656,8 @@ public: UpdateRequest _makeUpdateRequest(const repl::OpTime& newLastWriteOpTime, Date_t newLastWriteDate, - boost::optional<DurableTxnStateEnum> newState) const; + boost::optional<DurableTxnStateEnum> newState, + boost::optional<repl::OpTime> startOpTime) const; void _registerUpdateCacheOnCommit(OperationContext* opCtx, std::vector<StmtId> stmtIdsWritten, diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 6f51d4ddf7c..84a9da1c91f 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -238,7 +238,7 @@ protected: const auto opTime = logOp(opCtx(), kNss, uuid, session->getSessionId(), txnNum, stmtId, prevOpTime); txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState); + opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState, boost::none); wuow.commit(); return opTime; @@ -398,7 +398,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN const auto uuid = UUID::gen(); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); ASSERT_THROWS(txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none), + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none), AssertionException); } @@ -457,7 +457,7 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest, WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none); wuow.commit(); } @@ -466,7 +466,7 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest, WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum - 1, 0); txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none); + opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none, boost::none); } } @@ -486,7 +486,7 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest, txnParticipant.invalidate(opCtx()); txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none, boost::none); } TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) { @@ -591,7 +591,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke false /* inTxn */, OplogSlot()); txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none); + opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none, boost::none); wuow.commit(); return opTime; @@ -621,8 +621,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke false /* inTxn */, OplogSlot()); - txnParticipant.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime, boost::none); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), + txnNum, + {kIncompleteHistoryStmtId}, + opTime, + wallClockTime, + boost::none, + boost::none); wuow.commit(); } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index c4e2e35c856..95afd85a963 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -2914,7 +2914,8 @@ public: sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "inProgress"); - ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), secondOplogEntryTs); + // The transaction table is only updated at the start of the transaction. + ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs); sessionInfo = getSessionTxnInfoAtTimestamp(commitEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "committed"); @@ -3091,7 +3092,8 @@ public: sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "inProgress"); - ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), secondOplogEntryTs); + // The transaction table is only updated at the start of the transaction. + ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs); sessionInfo = getSessionTxnInfoAtTimestamp(prepareEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "prepared"); |