diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-12-11 18:19:10 -0500 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2019-01-07 21:20:54 -0500 |
commit | c2892222a633609fd14706062d8ed6086352004d (patch) | |
tree | 7693f9400cdb027fab872fa9073b61b038cf9d86 /src/mongo/db | |
parent | 327a6bd87961eb7d3cd2a4cd90170e868adf2112 (diff) | |
download | mongo-c2892222a633609fd14706062d8ed6086352004d.tar.gz |
SERVER-38282 Step-up reacquires locks for prepared transactions
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_mongod.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 9 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 41 |
6 files changed, 115 insertions, 36 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c649dd4e832..ec3a987a8a1 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -507,8 +507,15 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC const auto opTimeToReturn = fassert(28665, loadLastOpTime(opCtx)); _shardingOnTransitionToPrimaryHook(opCtx); + + // This has to go before reaquiring locks for prepared transactions, otherwise this can be + // blocked by prepared transactions. _dropAllTempCollections(opCtx); + MongoDSessionCatalog::onStepUp(opCtx); + + notifyFreeMonitoringOnTransitionToPrimary(); + // It is only necessary to check the system indexes on the first transition to master. // On subsequent transitions to master the indexes will have already been created. static std::once_flag verifySystemIndexesOnce; @@ -789,10 +796,6 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook validator->enableKeyGenerator(opCtx, true); } } - - MongoDSessionCatalog::onStepUp(opCtx); - - notifyFreeMonitoringOnTransitionToPrimary(); } void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 1b313c4c440..e6753e65d39 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1006,21 +1006,6 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, _externalState->onDrainComplete(opCtx); - if (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) { - log() << "transition to primary - " - "transitionToPrimaryHangBeforeTakingGlobalExclusiveLock fail point enabled. " - "Blocking until fail point is disabled."; - while (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) { - mongo::sleepsecs(1); - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_inShutdown) { - break; - } - } - } - } - ReplicationStateTransitionLockGuard transitionGuard(opCtx); lk.lock(); diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 01bc5424644..c5fae50fe9c 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -106,17 +106,39 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) { // The use of shared_ptr here is in order to work around the limitation of stdx::function that // the functor must be copyable. auto sessionKillTokens = std::make_shared<std::vector<Session::KillToken>>(); + + // Scan all sessions and reacquire locks for prepared transactions. + // There may be sessions that are checked out during this scan, but none of them + // can be prepared transactions, since only oplog application can make transactions + // prepared on secondaries and oplog application has been stopped at this moment. + std::vector<LogicalSessionId> sessionIdToReacquireLocks; + SessionKiller::Matcher matcher( KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); - catalog->scanSessions( - matcher, [&sessionKillTokens](WithLock sessionCatalogLock, Session* session) { - const auto txnParticipant = TransactionParticipant::get(session); - if (!txnParticipant->inMultiDocumentTransaction()) { - sessionKillTokens->emplace_back(session->kill(sessionCatalogLock)); - } - }); + catalog->scanSessions(matcher, [&](WithLock sessionCatalogLock, Session* session) { + const auto txnParticipant = TransactionParticipant::get(session); + if (!txnParticipant->inMultiDocumentTransaction()) { + sessionKillTokens->emplace_back(session->kill(sessionCatalogLock)); + } + + if (txnParticipant->transactionIsPrepared()) { + sessionIdToReacquireLocks.emplace_back(session->getSessionId()); + } + }); killSessionTokensFunction(opCtx, sessionKillTokens); + { + // Create a new opCtx because we need an empty locker to refresh the locks. + auto newClient = opCtx->getServiceContext()->makeClient("restore-prepared-txn"); + AlternativeClientRegion acr(newClient); + auto newOpCtx = cc().makeOperationContext(); + for (const auto& sessionId : sessionIdToReacquireLocks) { + auto scopedSessionCheckOut = catalog->checkOutSession(newOpCtx.get(), sessionId); + auto txnParticipant = TransactionParticipant::get(scopedSessionCheckOut.get()); + txnParticipant->refreshLocksForPreparedTransaction(newOpCtx.get(), false); + } + } + const size_t initialExtentSize = 0; const bool capped = false; const bool maxSize = 0; diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 3ff49ba646e..a0281e04a52 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -540,7 +540,7 @@ TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() { } } -TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool keepTicket) { +TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, StashStyle stashStyle) { // We must lock the Client to change the Locker on the OperationContext. stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -551,13 +551,13 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool // Inherit the locking setting from the original one. opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication( _locker->shouldConflictWithSecondaryBatchApplication()); - if (!keepTicket) { + if (stashStyle != StashStyle::kSideTransaction) { _locker->releaseTicket(); } _locker->unsetThreadId(); // On secondaries, we yield the locks for transactions. - if (!opCtx->writesAreReplicated()) { + if (stashStyle == StashStyle::kSecondary) { _lockSnapshot = std::make_unique<Locker::LockSnapshot>(); _locker->releaseWriteUnitOfWork(_lockSnapshot.get()); } @@ -565,12 +565,12 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool // This thread must still respect the transaction lock timeout, since it can prevent the // transaction from making progress. auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load(); - if (opCtx->writesAreReplicated() && maxTransactionLockMillis >= 0) { + if (stashStyle != StashStyle::kSecondary && maxTransactionLockMillis >= 0) { opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis)); } // On secondaries, max lock timeout must not be set. - invariant(opCtx->writesAreReplicated() || !opCtx->lockState()->hasMaxLockTimeout()); + invariant(stashStyle != StashStyle::kSecondary || !opCtx->lockState()->hasMaxLockTimeout()); _recoveryUnit = opCtx->releaseRecoveryUnit(); opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>( @@ -632,7 +632,8 @@ void TransactionParticipant::TxnResources::release(OperationContext* opCtx) { TransactionParticipant::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx) : _opCtx(opCtx) { if (_opCtx->getWriteUnitOfWork()) { - _txnResources = TransactionParticipant::TxnResources(_opCtx, true /* keepTicket*/); + _txnResources = TransactionParticipant::TxnResources( + _opCtx, TxnResources::StashStyle::kSideTransaction); } } @@ -659,7 +660,9 @@ void TransactionParticipant::_stashActiveTransaction(WithLock, OperationContext* } invariant(!_txnResourceStash); - _txnResourceStash = TxnResources(opCtx); + auto stashStyle = opCtx->writesAreReplicated() ? TxnResources::StashStyle::kPrimary + : TxnResources::StashStyle::kSecondary; + _txnResourceStash = TxnResources(opCtx, stashStyle); } @@ -800,6 +803,28 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx } } +void TransactionParticipant::refreshLocksForPreparedTransaction(OperationContext* opCtx, + bool yieldLocks) { + // The opCtx will be used to swap locks, so it cannot hold any lock. + invariant(!opCtx->lockState()->isRSTLLocked()); + invariant(!opCtx->lockState()->isLocked()); + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + // The node must have txn resource. + invariant(_txnResourceStash); + invariant(_txnState.isPrepared(lk)); + + // Transfer the txn resource from the stash to the operation context. + _txnResourceStash->release(opCtx); + _txnResourceStash = boost::none; + + // Transfer the txn resource back from the operation context to the stash. + auto stashStyle = + yieldLocks ? TxnResources::StashStyle::kSecondary : TxnResources::StashStyle::kPrimary; + _txnResourceStash = TxnResources(opCtx, stashStyle); +} + Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, boost::optional<repl::OpTime> prepareOptime) { stdx::unique_lock<stdx::mutex> lk(_mutex); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 6334090f4a0..67aadb20aa1 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -87,11 +87,13 @@ public: */ class TxnResources { public: + enum class StashStyle { kPrimary, kSecondary, kSideTransaction }; + /** * Stashes transaction state from 'opCtx' in the newly constructed TxnResources. * Ephemerally holds the Client lock associated with opCtx. */ - TxnResources(OperationContext* opCtx, bool keepTicket = false); + TxnResources(OperationContext* opCtx, StashStyle stashStyle); ~TxnResources(); // Rule of 5: because we have a class-defined destructor, we need to explictly specify @@ -290,6 +292,11 @@ public: std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations(OperationContext* opCtx); /** + * Yield or reacquire locks for prepared transacitons, used on replication state transition. + */ + void refreshLocksForPreparedTransaction(OperationContext* opCtx, bool yieldLocks); + + /** * May only be called while a multi-document transaction is not committed and adds the multi-key * path info to the set of path infos to be updated at commit time. */ diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index ea599cf2d7c..e9271227c3a 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -246,10 +246,11 @@ protected: func(newOpCtx.get()); } - std::unique_ptr<MongoDOperationContextSession> checkOutSession() { + std::unique_ptr<MongoDOperationContextSession> checkOutSession( + boost::optional<bool> startNewTxn = true) { auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, true); + txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, startNewTxn); return opCtxSession; } @@ -1552,6 +1553,42 @@ TEST_F(TxnParticipantTest, ThrowDuringPreparedOnTransactionAbortIsFatal) { ErrorCodes::OperationFailed); } +TEST_F(TxnParticipantTest, ReacquireLocksForPreparedTransactionsOnStepUp) { + ASSERT(opCtx()->writesAreReplicated()); + + // Prepare a transaction on secondary. + { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + + // Simulate a transaction on secondary. + repl::UnreplicatedWritesBlock uwb(opCtx()); + ASSERT(!opCtx()->writesAreReplicated()); + + txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); + // Simulate the locking of an insert. + { + Lock::DBLock dbLock(opCtx(), "test", MODE_IX); + Lock::CollectionLock collLock(opCtx()->lockState(), "test.foo", MODE_IX); + } + txnParticipant->prepareTransaction(opCtx(), repl::OpTime({1, 1}, 1)); + txnParticipant->stashTransactionResources(opCtx()); + // Secondary yields locks for prepared transactions. + ASSERT_FALSE(txnParticipant->getTxnResourceStashLockerForTest()->isLocked()); + } + + // Step-up will restore the locks of prepared transactions. + ASSERT(opCtx()->writesAreReplicated()); + MongoDSessionCatalog::onStepUp(opCtx()); + { + auto sessionCheckout = checkOutSession({}); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT(txnParticipant->getTxnResourceStashLockerForTest()->isLocked()); + txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction"); + txnParticipant->abortActiveTransaction(opCtx()); + } +} + /** * Test fixture for transactions metrics. */ |