summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2018-12-11 18:19:10 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2019-01-07 21:20:54 -0500
commitc2892222a633609fd14706062d8ed6086352004d (patch)
tree7693f9400cdb027fab872fa9073b61b038cf9d86 /src
parent327a6bd87961eb7d3cd2a4cd90170e868adf2112 (diff)
downloadmongo-c2892222a633609fd14706062d8ed6086352004d.tar.gz
SERVER-38282 Step-up reacquires locks for prepared transactions
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp11
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp15
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp36
-rw-r--r--src/mongo/db/transaction_participant.cpp39
-rw-r--r--src/mongo/db/transaction_participant.h9
-rw-r--r--src/mongo/db/transaction_participant_test.cpp41
6 files changed, 115 insertions, 36 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index c649dd4e832..ec3a987a8a1 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -507,8 +507,15 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
const auto opTimeToReturn = fassert(28665, loadLastOpTime(opCtx));
_shardingOnTransitionToPrimaryHook(opCtx);
+
+ // This has to go before reaquiring locks for prepared transactions, otherwise this can be
+ // blocked by prepared transactions.
_dropAllTempCollections(opCtx);
+ MongoDSessionCatalog::onStepUp(opCtx);
+
+ notifyFreeMonitoringOnTransitionToPrimary();
+
// It is only necessary to check the system indexes on the first transition to master.
// On subsequent transitions to master the indexes will have already been created.
static std::once_flag verifySystemIndexesOnce;
@@ -789,10 +796,6 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
validator->enableKeyGenerator(opCtx, true);
}
}
-
- MongoDSessionCatalog::onStepUp(opCtx);
-
- notifyFreeMonitoringOnTransitionToPrimary();
}
void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 1b313c4c440..e6753e65d39 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1006,21 +1006,6 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
_externalState->onDrainComplete(opCtx);
- if (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) {
- log() << "transition to primary - "
- "transitionToPrimaryHangBeforeTakingGlobalExclusiveLock fail point enabled. "
- "Blocking until fail point is disabled.";
- while (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) {
- mongo::sleepsecs(1);
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_inShutdown) {
- break;
- }
- }
- }
- }
-
ReplicationStateTransitionLockGuard transitionGuard(opCtx);
lk.lock();
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index 01bc5424644..c5fae50fe9c 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -106,17 +106,39 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
// The use of shared_ptr here is in order to work around the limitation of stdx::function that
// the functor must be copyable.
auto sessionKillTokens = std::make_shared<std::vector<Session::KillToken>>();
+
+ // Scan all sessions and reacquire locks for prepared transactions.
+ // There may be sessions that are checked out during this scan, but none of them
+ // can be prepared transactions, since only oplog application can make transactions
+ // prepared on secondaries and oplog application has been stopped at this moment.
+ std::vector<LogicalSessionId> sessionIdToReacquireLocks;
+
SessionKiller::Matcher matcher(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
- catalog->scanSessions(
- matcher, [&sessionKillTokens](WithLock sessionCatalogLock, Session* session) {
- const auto txnParticipant = TransactionParticipant::get(session);
- if (!txnParticipant->inMultiDocumentTransaction()) {
- sessionKillTokens->emplace_back(session->kill(sessionCatalogLock));
- }
- });
+ catalog->scanSessions(matcher, [&](WithLock sessionCatalogLock, Session* session) {
+ const auto txnParticipant = TransactionParticipant::get(session);
+ if (!txnParticipant->inMultiDocumentTransaction()) {
+ sessionKillTokens->emplace_back(session->kill(sessionCatalogLock));
+ }
+
+ if (txnParticipant->transactionIsPrepared()) {
+ sessionIdToReacquireLocks.emplace_back(session->getSessionId());
+ }
+ });
killSessionTokensFunction(opCtx, sessionKillTokens);
+ {
+ // Create a new opCtx because we need an empty locker to refresh the locks.
+ auto newClient = opCtx->getServiceContext()->makeClient("restore-prepared-txn");
+ AlternativeClientRegion acr(newClient);
+ auto newOpCtx = cc().makeOperationContext();
+ for (const auto& sessionId : sessionIdToReacquireLocks) {
+ auto scopedSessionCheckOut = catalog->checkOutSession(newOpCtx.get(), sessionId);
+ auto txnParticipant = TransactionParticipant::get(scopedSessionCheckOut.get());
+ txnParticipant->refreshLocksForPreparedTransaction(newOpCtx.get(), false);
+ }
+ }
+
const size_t initialExtentSize = 0;
const bool capped = false;
const bool maxSize = 0;
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 3ff49ba646e..a0281e04a52 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -540,7 +540,7 @@ TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() {
}
}
-TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool keepTicket) {
+TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, StashStyle stashStyle) {
// We must lock the Client to change the Locker on the OperationContext.
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -551,13 +551,13 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool
// Inherit the locking setting from the original one.
opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(
_locker->shouldConflictWithSecondaryBatchApplication());
- if (!keepTicket) {
+ if (stashStyle != StashStyle::kSideTransaction) {
_locker->releaseTicket();
}
_locker->unsetThreadId();
// On secondaries, we yield the locks for transactions.
- if (!opCtx->writesAreReplicated()) {
+ if (stashStyle == StashStyle::kSecondary) {
_lockSnapshot = std::make_unique<Locker::LockSnapshot>();
_locker->releaseWriteUnitOfWork(_lockSnapshot.get());
}
@@ -565,12 +565,12 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool
// This thread must still respect the transaction lock timeout, since it can prevent the
// transaction from making progress.
auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load();
- if (opCtx->writesAreReplicated() && maxTransactionLockMillis >= 0) {
+ if (stashStyle != StashStyle::kSecondary && maxTransactionLockMillis >= 0) {
opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis));
}
// On secondaries, max lock timeout must not be set.
- invariant(opCtx->writesAreReplicated() || !opCtx->lockState()->hasMaxLockTimeout());
+ invariant(stashStyle != StashStyle::kSecondary || !opCtx->lockState()->hasMaxLockTimeout());
_recoveryUnit = opCtx->releaseRecoveryUnit();
opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>(
@@ -632,7 +632,8 @@ void TransactionParticipant::TxnResources::release(OperationContext* opCtx) {
TransactionParticipant::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx)
: _opCtx(opCtx) {
if (_opCtx->getWriteUnitOfWork()) {
- _txnResources = TransactionParticipant::TxnResources(_opCtx, true /* keepTicket*/);
+ _txnResources = TransactionParticipant::TxnResources(
+ _opCtx, TxnResources::StashStyle::kSideTransaction);
}
}
@@ -659,7 +660,9 @@ void TransactionParticipant::_stashActiveTransaction(WithLock, OperationContext*
}
invariant(!_txnResourceStash);
- _txnResourceStash = TxnResources(opCtx);
+ auto stashStyle = opCtx->writesAreReplicated() ? TxnResources::StashStyle::kPrimary
+ : TxnResources::StashStyle::kSecondary;
+ _txnResourceStash = TxnResources(opCtx, stashStyle);
}
@@ -800,6 +803,28 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
}
}
+void TransactionParticipant::refreshLocksForPreparedTransaction(OperationContext* opCtx,
+ bool yieldLocks) {
+ // The opCtx will be used to swap locks, so it cannot hold any lock.
+ invariant(!opCtx->lockState()->isRSTLLocked());
+ invariant(!opCtx->lockState()->isLocked());
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ // The node must have txn resource.
+ invariant(_txnResourceStash);
+ invariant(_txnState.isPrepared(lk));
+
+ // Transfer the txn resource from the stash to the operation context.
+ _txnResourceStash->release(opCtx);
+ _txnResourceStash = boost::none;
+
+ // Transfer the txn resource back from the operation context to the stash.
+ auto stashStyle =
+ yieldLocks ? TxnResources::StashStyle::kSecondary : TxnResources::StashStyle::kPrimary;
+ _txnResourceStash = TxnResources(opCtx, stashStyle);
+}
+
Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx,
boost::optional<repl::OpTime> prepareOptime) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 6334090f4a0..67aadb20aa1 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -87,11 +87,13 @@ public:
*/
class TxnResources {
public:
+ enum class StashStyle { kPrimary, kSecondary, kSideTransaction };
+
/**
* Stashes transaction state from 'opCtx' in the newly constructed TxnResources.
* Ephemerally holds the Client lock associated with opCtx.
*/
- TxnResources(OperationContext* opCtx, bool keepTicket = false);
+ TxnResources(OperationContext* opCtx, StashStyle stashStyle);
~TxnResources();
// Rule of 5: because we have a class-defined destructor, we need to explictly specify
@@ -290,6 +292,11 @@ public:
std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations(OperationContext* opCtx);
/**
+ * Yield or reacquire locks for prepared transacitons, used on replication state transition.
+ */
+ void refreshLocksForPreparedTransaction(OperationContext* opCtx, bool yieldLocks);
+
+ /**
* May only be called while a multi-document transaction is not committed and adds the multi-key
* path info to the set of path infos to be updated at commit time.
*/
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index ea599cf2d7c..e9271227c3a 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -246,10 +246,11 @@ protected:
func(newOpCtx.get());
}
- std::unique_ptr<MongoDOperationContextSession> checkOutSession() {
+ std::unique_ptr<MongoDOperationContextSession> checkOutSession(
+ boost::optional<bool> startNewTxn = true) {
auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, true);
+ txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, startNewTxn);
return opCtxSession;
}
@@ -1552,6 +1553,42 @@ TEST_F(TxnParticipantTest, ThrowDuringPreparedOnTransactionAbortIsFatal) {
ErrorCodes::OperationFailed);
}
+TEST_F(TxnParticipantTest, ReacquireLocksForPreparedTransactionsOnStepUp) {
+ ASSERT(opCtx()->writesAreReplicated());
+
+ // Prepare a transaction on secondary.
+ {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+
+ // Simulate a transaction on secondary.
+ repl::UnreplicatedWritesBlock uwb(opCtx());
+ ASSERT(!opCtx()->writesAreReplicated());
+
+ txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
+ // Simulate the locking of an insert.
+ {
+ Lock::DBLock dbLock(opCtx(), "test", MODE_IX);
+ Lock::CollectionLock collLock(opCtx()->lockState(), "test.foo", MODE_IX);
+ }
+ txnParticipant->prepareTransaction(opCtx(), repl::OpTime({1, 1}, 1));
+ txnParticipant->stashTransactionResources(opCtx());
+ // Secondary yields locks for prepared transactions.
+ ASSERT_FALSE(txnParticipant->getTxnResourceStashLockerForTest()->isLocked());
+ }
+
+ // Step-up will restore the locks of prepared transactions.
+ ASSERT(opCtx()->writesAreReplicated());
+ MongoDSessionCatalog::onStepUp(opCtx());
+ {
+ auto sessionCheckout = checkOutSession({});
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT(txnParticipant->getTxnResourceStashLockerForTest()->isLocked());
+ txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
+ txnParticipant->abortActiveTransaction(opCtx());
+ }
+}
+
/**
* Test fixture for transactions metrics.
*/