diff options
-rw-r--r-- | src/mongo/db/operation_context_session_mongod.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 2 |
7 files changed, 90 insertions, 46 deletions
diff --git a/src/mongo/db/operation_context_session_mongod.cpp b/src/mongo/db/operation_context_session_mongod.cpp index c2163b89bff..7ae26abcd2c 100644 --- a/src/mongo/db/operation_context_session_mongod.cpp +++ b/src/mongo/db/operation_context_session_mongod.cpp @@ -81,7 +81,7 @@ OperationContextSessionMongodWithoutRefresh::OperationContextSessionMongodWithou auto txnParticipant = TransactionParticipant::get(opCtx); invariant(txnParticipant); - txnParticipant->beginTransactionUnconditionally(clientTxnNumber); + txnParticipant->beginOrContinueTransactionUnconditionally(clientTxnNumber); } } // namespace mongo diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 622b0a0eb64..3530c71aca7 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -127,9 +127,15 @@ Status _applyOps(OperationContext* opCtx, Status status(ErrorCodes::InternalError, ""); if (haveWrappingWUOW) { - invariant(opCtx->lockState()->isW()); + // Atomic applyOps command already acquired the global write lock. + invariant(opCtx->lockState()->isW() || + oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd); + // Only CRUD operations are allowed in atomic mode. invariant(*opType != 'c'); + // ApplyOps does not have the global writer lock when applying transaction + // operations, so we need to acquire the DB and Collection locks. + Lock::DBLock dbLock(opCtx, nss.db(), MODE_IX); auto db = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.ns()); if (!db) { // Retry in non-atomic mode, since MMAP cannot implicitly create a new database @@ -144,6 +150,7 @@ Status _applyOps(OperationContext* opCtx, // implicitly created on upserts. We detect both cases here and fail early with // NamespaceNotFound. // Additionally for inserts, we fail early on non-existent collections. + Lock::CollectionLock collectionLock(opCtx->lockState(), nss.ns(), MODE_IX); auto collection = db->getCollection(opCtx, nss); if (!collection && (*opType == 'i' || *opType == 'u')) { uasserted( @@ -283,18 +290,13 @@ Status _applyPrepareTransaction(OperationContext* opCtx, invariant(transaction); transaction->unstashTransactionResources(opCtx, "prepareTransaction"); - - // Abort transaction unconditionally for now. - // TODO: SERVER-35875 / SERVER-35877 Abort or commit transactions on secondaries accordingly. - ON_BLOCK_EXIT([&] { transaction->abortActiveTransaction(opCtx); }); - auto status = _applyOps( opCtx, dbName, applyOpCmd, info, oplogApplicationMode, result, numApplied, opsBuilder); if (!status.isOK()) { return status; } transaction->prepareTransaction(opCtx, optime); - + transaction->stashTransactionResources(opCtx); return Status::OK(); } @@ -383,6 +385,24 @@ Status applyOps(OperationContext* opCtx, BSONObjBuilder* result) { auto info = ApplyOpsCommandInfo::parse(applyOpCmd); + int numApplied = 0; + + // Apply prepare transaction operation if "prepare" is true. + // The lock requirement of transaction operations should be the same as that on the primary, + // so we don't acquire the locks conservatively for them. + if (info.getPrepare().get_value_or(false)) { + invariant(optime); + return _applyPrepareTransaction(opCtx, + dbName, + applyOpCmd, + info, + oplogApplicationMode, + result, + &numApplied, + nullptr, + *optime); + } + boost::optional<Lock::GlobalWrite> globalWriteLock; boost::optional<Lock::DBLock> dbWriteLock; @@ -411,22 +431,6 @@ Status applyOps(OperationContext* opCtx, } } - int numApplied = 0; - - // Apply prepare transaction operation if "prepare" is true. - if (info.getPrepare().get_value_or(false)) { - invariant(optime); - return _applyPrepareTransaction(opCtx, - dbName, - applyOpCmd, - info, - oplogApplicationMode, - result, - &numApplied, - nullptr, - *optime); - } - if (!info.isAtomic()) { return _applyOps( opCtx, dbName, applyOpCmd, info, oplogApplicationMode, result, &numApplied, nullptr); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 50d7d62825d..499312b3b14 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -86,6 +86,7 @@ #include "mongo/db/stats/counters.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/storage_options.h" +#include "mongo/db/transaction_participant.h" #include "mongo/platform/random.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/memory.h" @@ -998,7 +999,14 @@ std::map<std::string, ApplyOpMetadata> opsMap = { const BSONElement& ui, BSONObj& cmd, const OpTime& opTime, - OplogApplication::Mode mode) -> Status { return Status::OK(); }}}, + OplogApplication::Mode mode) -> Status { + // Session has been checked out by sync_tail. + auto transaction = TransactionParticipant::get(opCtx); + invariant(transaction); + transaction->unstashTransactionResources(opCtx, "abortTransaction"); + transaction->abortActiveTransaction(opCtx); + return Status::OK(); + }}}, }; } // namespace @@ -1529,6 +1537,9 @@ Status applyCommand_inlock(OperationContext* opCtx, return {ErrorCodes::InvalidNamespace, "invalid ns: " + std::string(nss.ns())}; } { + // Command application doesn't always acquire the global writer lock for transaction + // commands, so we acquire its own locks here. + Lock::DBLock lock(opCtx, nss.db(), MODE_IS); Database* db = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.ns()); if (db && !db->getCollection(opCtx, nss) && db->getViewCatalog()->lookup(opCtx, nss.ns())) { return {ErrorCodes::CommandNotSupportedOnView, @@ -1553,10 +1564,6 @@ Status applyCommand_inlock(OperationContext* opCtx, << redact(op)); } - // Applying commands in repl is done under Global W-lock, so it is safe to not - // perform the current DB checks after reacquiring the lock. - invariant(opCtx->lockState()->isW()); - // Parse optime from oplog entry unless we are applying this command in standalone or on a // primary (replicated writes enabled). OpTime opTime; @@ -1567,7 +1574,7 @@ Status applyCommand_inlock(OperationContext* opCtx, } } - const bool assignCommandTimestamp = [opCtx, mode, &op] { + const bool assignCommandTimestamp = [opCtx, mode, &op, &o] { const auto replMode = ReplicationCoordinator::get(opCtx)->getReplicationMode(); if (opCtx->writesAreReplicated()) { // We do not assign timestamps on replicated writes since they will get their oplog @@ -1575,6 +1582,11 @@ Status applyCommand_inlock(OperationContext* opCtx, return false; } + // Don't assign commit timestamp for transaction commands. + const StringData commandName(o.firstElementFieldName()); + if (op.getBoolField("prepare") || commandName == "abortTransaction") + return false; + switch (replMode) { case ReplicationCoordinator::modeReplSet: { // The 'applyOps' command never logs 'applyOps' oplog entries with nested @@ -1582,9 +1594,7 @@ Status applyCommand_inlock(OperationContext* opCtx, // command on secondaries. Thus, the timestamps in the command oplog // entries are always real timestamps from this oplog and we should // timestamp our writes with them. - // - // However, if "prepare" is specified, don't assign commit timestamp. - return !op.getBoolField("prepare"); + return true; } case ReplicationCoordinator::modeNone: { // Only assign timestamps on standalones during replication recovery when diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index ee5cea520f8..1ae37147ac6 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -49,6 +49,7 @@ #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" @@ -303,9 +304,16 @@ Status SyncTail::syncApply(OperationContext* opCtx, }); } else if (opType == OpTypeEnum::kCommand) { return writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] { - // a command may need a global write lock. so we will conservatively go - // ahead and grab one here. suboptimal. :-( - Lock::GlobalWrite globalWriteLock(opCtx); + // A command may need a global write lock. so we will conservatively go + // ahead and grab one for non-transaction commands. + // Transactions have to acquire the same locks on secondaries as on primary. + boost::optional<Lock::GlobalWrite> globalWriteLock; + + // TODO SERVER-37180 Remove this double-parsing. + const StringData commandName(op["o"].embeddedObject().firstElementFieldName()); + if (!op.getBoolField("prepare") && commandName != "abortTransaction") { + globalWriteLock.emplace(opCtx); + } // special case apply for commands to avoid implicit database creation Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode); @@ -1130,7 +1138,10 @@ Status multiSyncApply(OperationContext* opCtx, UnreplicatedWritesBlock uwb(opCtx); DisableDocumentValidation validationDisabler(opCtx); - ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState()); + // Since we swap the locker in stash / unstash transaction resources, + // ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been + // destroyed by unstash in its destructor. Thus we set the flag explicitly. + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); // Explicitly start future read transactions without a timestamp. opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); @@ -1152,7 +1163,7 @@ Status multiSyncApply(OperationContext* opCtx, MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo(); for (auto it = ops->cbegin(); it != ops->cend(); ++it) { - const auto& entry = **it; + const OplogEntry& entry = **it; // If we are successful in grouping and applying inserts, advance the current iterator // past the end of the inserted group of entries. @@ -1168,11 +1179,12 @@ Status multiSyncApply(OperationContext* opCtx, // from disk may read that write, causing starting a new transaction on an existing // txnNumber. Thus, we start a new transaction without refreshing state from disk. boost::optional<OperationContextSessionMongodWithoutRefresh> sessionTxnState; - if (entry.shouldPrepare()) { - // Prepare transaction is in its own batch. We cannot modify the opCtx for other - // ops. + if (entry.shouldPrepare() || + (entry.isCommand() && + entry.getCommandType() == OplogEntry::CommandType::kAbortTransaction)) { // The update on transaction table may be scheduled to the same writer. invariant(ops->size() <= 2); + // Transaction operations are in its own batch, so we can modify their opCtx. invariant(entry.getSessionId()); invariant(entry.getTxnNumber()); opCtx->setLogicalSessionId(*entry.getSessionId()); diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 8043df51d68..439bd7e7602 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -818,6 +818,7 @@ void execCommandDatabase(OperationContext* opCtx, opCtx->getTxnNumber()); opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true); + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); } auto& oss = OperationShardingState::get(opCtx); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 3278730d7ea..d848960eb42 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -260,9 +260,12 @@ void TransactionParticipant::beginOrContinue(TxnNumber txnNumber, _beginMultiDocumentTransaction(lg, txnNumber); } -void TransactionParticipant::beginTransactionUnconditionally(TxnNumber txnNumber) { +void TransactionParticipant::beginOrContinueTransactionUnconditionally(TxnNumber txnNumber) { stdx::lock_guard<stdx::mutex> lg(_mutex); - _beginMultiDocumentTransaction(lg, txnNumber); + // Continuing transaction unconditionally is a no-op since we don't check any on-disk state. + if (_activeTxnNumber != txnNumber) { + _beginMultiDocumentTransaction(lg, txnNumber); + } } void TransactionParticipant::setSpeculativeTransactionOpTime( @@ -302,8 +305,13 @@ TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* o // The new transaction should have an empty locker, and thus we do not need to save it. invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive); _locker = opCtx->swapLockState(stdx::make_unique<LockerImpl>()); + // Inherit the locking setting from the original one. + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication( + _locker->shouldConflictWithSecondaryBatchApplication()); _locker->unsetThreadId(); + // OplogSlotReserver is only used by primary, so always set max transaction lock timeout. + invariant(opCtx->writesAreReplicated()); // This thread must still respect the transaction lock timeout, since it can prevent the // transaction from making progress. auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load(); @@ -337,6 +345,9 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool opCtx->setWriteUnitOfWork(nullptr); _locker = opCtx->swapLockState(stdx::make_unique<LockerImpl>()); + // Inherit the locking setting from the original one. + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication( + _locker->shouldConflictWithSecondaryBatchApplication()); if (!keepTicket) { _locker->releaseTicket(); } @@ -345,10 +356,13 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool // This thread must still respect the transaction lock timeout, since it can prevent the // transaction from making progress. auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load(); - if (maxTransactionLockMillis >= 0) { + if (opCtx->writesAreReplicated() && maxTransactionLockMillis >= 0) { opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis)); } + // On secondaries, max lock timeout must not be set. + invariant(opCtx->writesAreReplicated() || !opCtx->lockState()->hasMaxLockTimeout()); + _recoveryUnit = opCtx->releaseRecoveryUnit(); opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>( opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()), @@ -504,10 +518,13 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx // to acquire a lock. This is to avoid deadlocks and minimize non-transaction // operation performance degradations. auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load(); - if (maxTransactionLockMillis >= 0) { + if (opCtx->writesAreReplicated() && maxTransactionLockMillis >= 0) { opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis)); } + // On secondaries, max lock timeout must not be set. + invariant(opCtx->writesAreReplicated() || !opCtx->lockState()->hasMaxLockTimeout()); + stdx::lock_guard<stdx::mutex> lm(_metricsMutex); _transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx), curTimeMicros64()); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index d287a548ee5..70042946744 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -352,7 +352,7 @@ public: boost::optional<bool> autocommit, boost::optional<bool> startTransaction); - void beginTransactionUnconditionally(TxnNumber txnNumber); + void beginOrContinueTransactionUnconditionally(TxnNumber txnNumber); static Status isValid(StringData dbName, StringData cmdName); |