summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2019-05-14 13:04:26 -0400
committerJason Chan <jason.chan@10gen.com>2019-05-14 13:08:08 -0400
commit02b4ea3a929731cb34a3eeb3190051da42e1f2d3 (patch)
treed22045418146babb183745717ff8d6be4e766c37
parent4ac5a397e19dfe0a731c39b27a340dbfd38a8346 (diff)
downloadmongo-02b4ea3a929731cb34a3eeb3190051da42e1f2d3.tar.gz
SERVER-39438 Write "abortTransaction" oplog entry when aborting unprepared transactions with replicated operations
-rw-r--r--src/mongo/db/SConscript3
-rw-r--r--src/mongo/db/op_observer_impl.cpp14
-rw-r--r--src/mongo/db/repl/sync_tail.cpp15
-rw-r--r--src/mongo/db/transaction_participant.cpp28
-rw-r--r--src/mongo/db/transaction_participant.h4
-rw-r--r--src/mongo/db/transaction_participant_test.cpp89
6 files changed, 132 insertions, 21 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 0859eb526a9..07a9442ae44 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2077,9 +2077,10 @@ env.CppUnitTest(
'auth/authmocks',
'dbdirectclient',
'logical_session_id',
- 'op_observer',
+ 'op_observer_impl',
'query_exec',
'repl/mock_repl_coord_server_fixture',
+ 'repl/oplog_interface_local',
'repl/repl_coordinator_interface',
'service_liaison_mock',
'sessions_collection_mock',
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 7c9d168c378..ca756c63a2b 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1267,10 +1267,10 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
return stmtId;
}
-void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
- const OplogSlot& oplogSlot,
- const BSONObj& objectField,
- DurableTxnStateEnum durableState) {
+void logCommitOrAbortForTransaction(OperationContext* opCtx,
+ const OplogSlot& oplogSlot,
+ const BSONObj& objectField,
+ DurableTxnStateEnum durableState) {
const NamespaceString cmdNss{"admin", "$cmd"};
OperationSessionInfo sessionInfo;
@@ -1299,7 +1299,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
invariant(!opCtx->lockState()->hasMaxLockTimeout());
writeConflictRetry(
- opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ opCtx, "onTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] {
// Writes to the oplog only require a Global intent lock. Guaranteed by
// OplogSlotReserver.
@@ -1452,7 +1452,7 @@ void OpObserverImpl::onPreparedTransactionCommit(
ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) {
cmdObj.setPrepared(true);
}
- logCommitOrAbortForPreparedTransaction(
+ logCommitOrAbortForTransaction(
opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted);
}
@@ -1606,7 +1606,7 @@ void OpObserverImpl::onTransactionAbort(OperationContext* opCtx,
}
AbortTransactionOplogObject cmdObj;
- logCommitOrAbortForPreparedTransaction(
+ logCommitOrAbortForTransaction(
opCtx, *abortOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kAborted);
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 6203d6f37f3..2f3269450c0 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -1150,16 +1150,19 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// yet.
if (op.isPartialTransaction()) {
auto& partialTxnList = partialTxnOps[*op.getSessionId()];
- if (!partialTxnList.empty() &&
- partialTxnList.front()->getTxnNumber() != op.getTxnNumber()) {
- // TODO: When abortTransaction is implemented, this should invariant and
- // the list should be cleared on abort.
- partialTxnList.clear();
- }
+ // If this operation belongs to an existing partial transaction, partialTxnList
+ // must contain the previous operations of the transaction.
+ invariant(partialTxnList.empty() ||
+ partialTxnList.front()->getTxnNumber() == op.getTxnNumber());
partialTxnList.push_back(&op);
continue;
}
+ if (op.getCommandType() == OplogEntry::CommandType::kAbortTransaction) {
+ auto& partialTxnList = partialTxnOps[*op.getSessionId()];
+ partialTxnList.clear();
+ }
+
if (op.isCrudOpType()) {
auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs);
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 2301b02d541..9bb23ab8e86 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -1180,6 +1180,21 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC
auto txnOps = retrieveCompletedTransactionOperations(opCtx);
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
+
+ auto abortGuard = makeGuard([&] {
+ if (gUseMultipleOplogEntryFormatForTransactions &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) {
+ // We should already be holding the RSTL if we have performed a read or write
+ // as part of this unprepared transaction.
+ invariant(opCtx->lockState()->isRSTLLocked());
+ opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
+ _abortActiveTransaction(
+ opCtx, TransactionState::kInProgress, true /* writeOplog */);
+ });
+ }
+ });
+
opObserver->onUnpreparedTransactionCommit(opCtx, txnOps);
auto wc = opCtx->getWriteConcern();
@@ -1194,6 +1209,8 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC
o(lk).txnState.transitionTo(TransactionState::kCommittingWithoutPrepare);
}
+ abortGuard.dismiss();
+
try {
// Once entering "committing without prepare" we cannot throw an exception.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
@@ -1383,7 +1400,9 @@ void TransactionParticipant::Participant::abortActiveTransaction(OperationContex
replCoord->canAcceptWritesForDatabase(opCtx, "admin"));
}
- _abortActiveTransaction(opCtx, TransactionState::kInProgress | TransactionState::kPrepared);
+ _abortActiveTransaction(opCtx,
+ TransactionState::kInProgress | TransactionState::kPrepared,
+ o().txnState.isPrepared());
}
void TransactionParticipant::Participant::abortActiveUnpreparedOrStashPreparedTransaction(
@@ -1400,7 +1419,7 @@ void TransactionParticipant::Participant::abortActiveUnpreparedOrStashPreparedTr
return;
}
- _abortActiveTransaction(opCtx, TransactionState::kInProgress);
+ _abortActiveTransaction(opCtx, TransactionState::kInProgress, false /* writeOplog */);
} catch (...) {
// It is illegal for this to throw so we catch and log this here for diagnosability.
severe() << "Caught exception during transaction " << opCtx->getTxnNumber()
@@ -1410,7 +1429,7 @@ void TransactionParticipant::Participant::abortActiveUnpreparedOrStashPreparedTr
}
void TransactionParticipant::Participant::_abortActiveTransaction(
- OperationContext* opCtx, TransactionState::StateSet expectedStates) {
+ OperationContext* opCtx, TransactionState::StateSet expectedStates, bool writeOplog) {
invariant(!o().txnResourceStash);
invariant(!o().txnState.isCommittingWithPrepare());
@@ -1419,14 +1438,13 @@ void TransactionParticipant::Participant::_abortActiveTransaction(
o(lk).transactionMetricsObserver.onTransactionOperation(
opCtx, CurOp::get(opCtx)->debug().additiveMetrics, o().txnState.isPrepared());
}
-
// We reserve an oplog slot before aborting the transaction so that no writes that are causally
// related to the transaction abort enter the oplog at a timestamp earlier than the abort oplog
// entry. On secondaries, we generate a fake empty oplog slot, since it's not used by the
// OpObserver.
boost::optional<OplogSlotReserver> oplogSlotReserver;
boost::optional<OplogSlot> abortOplogSlot;
- if (o().txnState.isPrepared() && opCtx->writesAreReplicated()) {
+ if (opCtx->writesAreReplicated() && writeOplog) {
oplogSlotReserver.emplace(opCtx);
abortOplogSlot = oplogSlotReserver->getLastSlot();
}
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index a76241912fd..4925c38b825 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -727,8 +727,10 @@ public:
// Abort the transaction if it's in one of the expected states and clean up the transaction
// states associated with the opCtx.
+ // If 'writeOplog' is true, logs an 'abortTransaction' oplog entry if writes are replicated.
void _abortActiveTransaction(OperationContext* opCtx,
- TransactionState::StateSet expectedStates);
+ TransactionState::StateSet expectedStates,
+ bool writeOplog);
// Releases stashed transaction resources to abort the transaction on the session.
void _abortTransactionOnSession(OperationContext* opCtx);
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index ab26ab2f1a3..0b40335468f 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -34,13 +34,16 @@
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_noop.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/mock_repl_coord_server_fixture.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog_mongod.h"
@@ -60,6 +63,9 @@
namespace mongo {
namespace {
+using repl::OplogEntry;
+using unittest::assertGet;
+
const NamespaceString kNss("TestDB", "TestColl");
/**
@@ -1021,7 +1027,7 @@ TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) {
ASSERT(_opObserver->transactionPrepared);
}
-TEST_F(TxnParticipantTest, ImplictAbortDoesNotAbortPreparedTransaction) {
+TEST_F(TxnParticipantTest, ImplicitAbortDoesNotAbortPreparedTransaction) {
auto outerScopedSession = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
@@ -3788,5 +3794,86 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnAbortPreparedTransacti
ASSERT_TRUE(txnParticipant.onExitPrepare().isReady());
}
+class MultiEntryOplogTxnParticipantTest : public TxnParticipantTest {
+ void setUp() override {
+ gUseMultipleOplogEntryFormatForTransactions = true;
+ TxnParticipantTest::setUp();
+ // Set up ReplicationCoordinator and create oplog.
+ auto service = opCtx()->getServiceContext();
+ repl::ReplicationCoordinator::set(
+ service,
+ stdx::make_unique<repl::ReplicationCoordinatorMock>(service, createReplSettings()));
+ repl::setOplogCollectionName(service);
+ repl::createOplog(opCtx());
+
+ // Ensure that we are primary.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx());
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ }
+
+ void tearDown() override {
+ TxnParticipantTest::tearDown();
+ gUseMultipleOplogEntryFormatForTransactions = false;
+ }
+
+protected:
+ // Assert that the oplog has the expected number of entries, and return them
+ std::vector<BSONObj> getNOplogEntries(OperationContext* opCtx, int n) {
+ std::vector<BSONObj> result(n);
+ repl::OplogInterfaceLocal oplogInterface(opCtx);
+ auto oplogIter = oplogInterface.makeIterator();
+ for (int i = n - 1; i >= 0; i--) {
+ // The oplogIterator returns the entries in reverse order.
+ auto opEntry = unittest::assertGet(oplogIter->next());
+ result[i] = opEntry.first;
+ }
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus());
+ return result;
+ }
+
+ // Assert that oplog only has a single entry and return that oplog entry.
+ BSONObj getSingleOplogEntry(OperationContext* opCtx) {
+ return getNOplogEntries(opCtx, 1).back();
+ }
+
+private:
+ // Creates a reasonable set of ReplSettings for most tests.
+ virtual repl::ReplSettings createReplSettings() {
+ repl::ReplSettings settings;
+ settings.setOplogSizeBytes(1 * 1024 * 1024);
+ settings.setReplSetString("mySet/node1:12345");
+ return settings;
+ }
+};
+
+TEST_F(MultiEntryOplogTxnParticipantTest, ErrorOnUnpreparedCommitAbortsTransaction) {
+ OpObserverImpl opObserver;
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+
+ // We expect to trigger 'onTransactionAbort' on failure of the unprepared commit.
+ _opObserver->onTransactionAbortFn = [&]() {
+ auto abortSlot = repl::getNextOpTime(opCtx());
+ opObserver.onTransactionAbort(opCtx(), abortSlot);
+ };
+
+ _opObserver->onUnpreparedTransactionCommitThrowsException = true;
+ ASSERT_THROWS_CODE(txnParticipant.commitUnpreparedTransaction(opCtx()),
+ AssertionException,
+ ErrorCodes::OperationFailed);
+
+ auto oplogEntry = getSingleOplogEntry(opCtx());
+
+ auto abortEntry = assertGet(OplogEntry::parse(oplogEntry));
+ auto o = abortEntry.getObject();
+
+ const auto oExpected = BSON("abortTransaction" << 1);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+
+ ASSERT_FALSE(_opObserver->unpreparedTransactionCommitted);
+ ASSERT_TRUE(txnParticipant.transactionIsAborted());
+}
+
} // namespace
} // namespace mongo