diff options
author | Jason Chan <jason.chan@10gen.com> | 2019-05-14 13:04:26 -0400 |
---|---|---|
committer | Jason Chan <jason.chan@10gen.com> | 2019-05-14 13:08:08 -0400 |
commit | 02b4ea3a929731cb34a3eeb3190051da42e1f2d3 (patch) | |
tree | d22045418146babb183745717ff8d6be4e766c37 | |
parent | 4ac5a397e19dfe0a731c39b27a340dbfd38a8346 (diff) | |
download | mongo-02b4ea3a929731cb34a3eeb3190051da42e1f2d3.tar.gz |
SERVER-39438 Write "abortTransaction" oplog entry when aborting unprepared transactions with replicated operations
-rw-r--r-- | src/mongo/db/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 4 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 89 |
6 files changed, 132 insertions, 21 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 0859eb526a9..07a9442ae44 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2077,9 +2077,10 @@ env.CppUnitTest( 'auth/authmocks', 'dbdirectclient', 'logical_session_id', - 'op_observer', + 'op_observer_impl', 'query_exec', 'repl/mock_repl_coord_server_fixture', + 'repl/oplog_interface_local', 'repl/repl_coordinator_interface', 'service_liaison_mock', 'sessions_collection_mock', diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 7c9d168c378..ca756c63a2b 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1267,10 +1267,10 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, return stmtId; } -void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, - const OplogSlot& oplogSlot, - const BSONObj& objectField, - DurableTxnStateEnum durableState) { +void logCommitOrAbortForTransaction(OperationContext* opCtx, + const OplogSlot& oplogSlot, + const BSONObj& objectField, + DurableTxnStateEnum durableState) { const NamespaceString cmdNss{"admin", "$cmd"}; OperationSessionInfo sessionInfo; @@ -1299,7 +1299,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, invariant(!opCtx->lockState()->hasMaxLockTimeout()); writeConflictRetry( - opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "onTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] { // Writes to the oplog only require a Global intent lock. Guaranteed by // OplogSlotReserver. @@ -1452,7 +1452,7 @@ void OpObserverImpl::onPreparedTransactionCommit( ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) { cmdObj.setPrepared(true); } - logCommitOrAbortForPreparedTransaction( + logCommitOrAbortForTransaction( opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted); } @@ -1606,7 +1606,7 @@ void OpObserverImpl::onTransactionAbort(OperationContext* opCtx, } AbortTransactionOplogObject cmdObj; - logCommitOrAbortForPreparedTransaction( + logCommitOrAbortForTransaction( opCtx, *abortOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kAborted); } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 6203d6f37f3..2f3269450c0 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1150,16 +1150,19 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // yet. if (op.isPartialTransaction()) { auto& partialTxnList = partialTxnOps[*op.getSessionId()]; - if (!partialTxnList.empty() && - partialTxnList.front()->getTxnNumber() != op.getTxnNumber()) { - // TODO: When abortTransaction is implemented, this should invariant and - // the list should be cleared on abort. - partialTxnList.clear(); - } + // If this operation belongs to an existing partial transaction, partialTxnList + // must contain the previous operations of the transaction. + invariant(partialTxnList.empty() || + partialTxnList.front()->getTxnNumber() == op.getTxnNumber()); partialTxnList.push_back(&op); continue; } + if (op.getCommandType() == OplogEntry::CommandType::kAbortTransaction) { + auto& partialTxnList = partialTxnOps[*op.getSessionId()]; + partialTxnList.clear(); + } + if (op.isCrudOpType()) { auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 2301b02d541..9bb23ab8e86 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1180,6 +1180,21 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC auto txnOps = retrieveCompletedTransactionOperations(opCtx); auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); + + auto abortGuard = makeGuard([&] { + if (gUseMultipleOplogEntryFormatForTransactions && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) { + // We should already be holding the RSTL if we have performed a read or write + // as part of this unprepared transaction. + invariant(opCtx->lockState()->isRSTLLocked()); + opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] { + _abortActiveTransaction( + opCtx, TransactionState::kInProgress, true /* writeOplog */); + }); + } + }); + opObserver->onUnpreparedTransactionCommit(opCtx, txnOps); auto wc = opCtx->getWriteConcern(); @@ -1194,6 +1209,8 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC o(lk).txnState.transitionTo(TransactionState::kCommittingWithoutPrepare); } + abortGuard.dismiss(); + try { // Once entering "committing without prepare" we cannot throw an exception. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); @@ -1383,7 +1400,9 @@ void TransactionParticipant::Participant::abortActiveTransaction(OperationContex replCoord->canAcceptWritesForDatabase(opCtx, "admin")); } - _abortActiveTransaction(opCtx, TransactionState::kInProgress | TransactionState::kPrepared); + _abortActiveTransaction(opCtx, + TransactionState::kInProgress | TransactionState::kPrepared, + o().txnState.isPrepared()); } void TransactionParticipant::Participant::abortActiveUnpreparedOrStashPreparedTransaction( @@ -1400,7 +1419,7 @@ void TransactionParticipant::Participant::abortActiveUnpreparedOrStashPreparedTr return; } - _abortActiveTransaction(opCtx, TransactionState::kInProgress); + _abortActiveTransaction(opCtx, TransactionState::kInProgress, false /* writeOplog */); } catch (...) { // It is illegal for this to throw so we catch and log this here for diagnosability. severe() << "Caught exception during transaction " << opCtx->getTxnNumber() @@ -1410,7 +1429,7 @@ void TransactionParticipant::Participant::abortActiveUnpreparedOrStashPreparedTr } void TransactionParticipant::Participant::_abortActiveTransaction( - OperationContext* opCtx, TransactionState::StateSet expectedStates) { + OperationContext* opCtx, TransactionState::StateSet expectedStates, bool writeOplog) { invariant(!o().txnResourceStash); invariant(!o().txnState.isCommittingWithPrepare()); @@ -1419,14 +1438,13 @@ void TransactionParticipant::Participant::_abortActiveTransaction( o(lk).transactionMetricsObserver.onTransactionOperation( opCtx, CurOp::get(opCtx)->debug().additiveMetrics, o().txnState.isPrepared()); } - // We reserve an oplog slot before aborting the transaction so that no writes that are causally // related to the transaction abort enter the oplog at a timestamp earlier than the abort oplog // entry. On secondaries, we generate a fake empty oplog slot, since it's not used by the // OpObserver. boost::optional<OplogSlotReserver> oplogSlotReserver; boost::optional<OplogSlot> abortOplogSlot; - if (o().txnState.isPrepared() && opCtx->writesAreReplicated()) { + if (opCtx->writesAreReplicated() && writeOplog) { oplogSlotReserver.emplace(opCtx); abortOplogSlot = oplogSlotReserver->getLastSlot(); } diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index a76241912fd..4925c38b825 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -727,8 +727,10 @@ public: // Abort the transaction if it's in one of the expected states and clean up the transaction // states associated with the opCtx. + // If 'writeOplog' is true, logs an 'abortTransaction' oplog entry if writes are replicated. void _abortActiveTransaction(OperationContext* opCtx, - TransactionState::StateSet expectedStates); + TransactionState::StateSet expectedStates, + bool writeOplog); // Releases stashed transaction resources to abort the transaction on the session. void _abortTransactionOnSession(OperationContext* opCtx); diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index ab26ab2f1a3..0b40335468f 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -34,13 +34,16 @@ #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_noop.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/mock_repl_coord_server_fixture.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/server_transactions_metrics.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog_mongod.h" @@ -60,6 +63,9 @@ namespace mongo { namespace { +using repl::OplogEntry; +using unittest::assertGet; + const NamespaceString kNss("TestDB", "TestColl"); /** @@ -1021,7 +1027,7 @@ TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) { ASSERT(_opObserver->transactionPrepared); } -TEST_F(TxnParticipantTest, ImplictAbortDoesNotAbortPreparedTransaction) { +TEST_F(TxnParticipantTest, ImplicitAbortDoesNotAbortPreparedTransaction) { auto outerScopedSession = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -3788,5 +3794,86 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnAbortPreparedTransacti ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); } +class MultiEntryOplogTxnParticipantTest : public TxnParticipantTest { + void setUp() override { + gUseMultipleOplogEntryFormatForTransactions = true; + TxnParticipantTest::setUp(); + // Set up ReplicationCoordinator and create oplog. + auto service = opCtx()->getServiceContext(); + repl::ReplicationCoordinator::set( + service, + stdx::make_unique<repl::ReplicationCoordinatorMock>(service, createReplSettings())); + repl::setOplogCollectionName(service); + repl::createOplog(opCtx()); + + // Ensure that we are primary. + auto replCoord = repl::ReplicationCoordinator::get(opCtx()); + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + } + + void tearDown() override { + TxnParticipantTest::tearDown(); + gUseMultipleOplogEntryFormatForTransactions = false; + } + +protected: + // Assert that the oplog has the expected number of entries, and return them + std::vector<BSONObj> getNOplogEntries(OperationContext* opCtx, int n) { + std::vector<BSONObj> result(n); + repl::OplogInterfaceLocal oplogInterface(opCtx); + auto oplogIter = oplogInterface.makeIterator(); + for (int i = n - 1; i >= 0; i--) { + // The oplogIterator returns the entries in reverse order. + auto opEntry = unittest::assertGet(oplogIter->next()); + result[i] = opEntry.first; + } + ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus()); + return result; + } + + // Assert that oplog only has a single entry and return that oplog entry. + BSONObj getSingleOplogEntry(OperationContext* opCtx) { + return getNOplogEntries(opCtx, 1).back(); + } + +private: + // Creates a reasonable set of ReplSettings for most tests. + virtual repl::ReplSettings createReplSettings() { + repl::ReplSettings settings; + settings.setOplogSizeBytes(1 * 1024 * 1024); + settings.setReplSetString("mySet/node1:12345"); + return settings; + } +}; + +TEST_F(MultiEntryOplogTxnParticipantTest, ErrorOnUnpreparedCommitAbortsTransaction) { + OpObserverImpl opObserver; + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); + + // We expect to trigger 'onTransactionAbort' on failure of the unprepared commit. + _opObserver->onTransactionAbortFn = [&]() { + auto abortSlot = repl::getNextOpTime(opCtx()); + opObserver.onTransactionAbort(opCtx(), abortSlot); + }; + + _opObserver->onUnpreparedTransactionCommitThrowsException = true; + ASSERT_THROWS_CODE(txnParticipant.commitUnpreparedTransaction(opCtx()), + AssertionException, + ErrorCodes::OperationFailed); + + auto oplogEntry = getSingleOplogEntry(opCtx()); + + auto abortEntry = assertGet(OplogEntry::parse(oplogEntry)); + auto o = abortEntry.getObject(); + + const auto oExpected = BSON("abortTransaction" << 1); + ASSERT_BSONOBJ_EQ(oExpected, o); + + ASSERT_FALSE(_opObserver->unpreparedTransactionCommitted); + ASSERT_TRUE(txnParticipant.transactionIsAborted()); +} + } // namespace } // namespace mongo |