summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2018-09-18 17:12:02 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2018-09-26 20:00:24 -0400
commitc74df030100ad57ba46c02ba505236f20d76b948 (patch)
tree603ff54350c0900e2df27b7e760894b365a62f80
parent9be782c954e7d24192c08252b98fe58e52c2071d (diff)
downloadmongo-c74df030100ad57ba46c02ba505236f20d76b948.tar.gz
SERVER-35875 Secondaries abort transactions when applying abortTransaction oplog entries
-rw-r--r--src/mongo/db/operation_context_session_mongod.cpp2
-rw-r--r--src/mongo/db/repl/apply_ops.cpp50
-rw-r--r--src/mongo/db/repl/oplog.cpp28
-rw-r--r--src/mongo/db/repl/sync_tail.cpp28
-rw-r--r--src/mongo/db/service_entry_point_common.cpp1
-rw-r--r--src/mongo/db/transaction_participant.cpp25
-rw-r--r--src/mongo/db/transaction_participant.h2
7 files changed, 90 insertions, 46 deletions
diff --git a/src/mongo/db/operation_context_session_mongod.cpp b/src/mongo/db/operation_context_session_mongod.cpp
index c2163b89bff..7ae26abcd2c 100644
--- a/src/mongo/db/operation_context_session_mongod.cpp
+++ b/src/mongo/db/operation_context_session_mongod.cpp
@@ -81,7 +81,7 @@ OperationContextSessionMongodWithoutRefresh::OperationContextSessionMongodWithou
auto txnParticipant = TransactionParticipant::get(opCtx);
invariant(txnParticipant);
- txnParticipant->beginTransactionUnconditionally(clientTxnNumber);
+ txnParticipant->beginOrContinueTransactionUnconditionally(clientTxnNumber);
}
} // namespace mongo
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 622b0a0eb64..3530c71aca7 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -127,9 +127,15 @@ Status _applyOps(OperationContext* opCtx,
Status status(ErrorCodes::InternalError, "");
if (haveWrappingWUOW) {
- invariant(opCtx->lockState()->isW());
+ // Atomic applyOps command already acquired the global write lock.
+ invariant(opCtx->lockState()->isW() ||
+ oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd);
+ // Only CRUD operations are allowed in atomic mode.
invariant(*opType != 'c');
+ // ApplyOps does not have the global writer lock when applying transaction
+ // operations, so we need to acquire the DB and Collection locks.
+ Lock::DBLock dbLock(opCtx, nss.db(), MODE_IX);
auto db = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.ns());
if (!db) {
// Retry in non-atomic mode, since MMAP cannot implicitly create a new database
@@ -144,6 +150,7 @@ Status _applyOps(OperationContext* opCtx,
// implicitly created on upserts. We detect both cases here and fail early with
// NamespaceNotFound.
// Additionally for inserts, we fail early on non-existent collections.
+ Lock::CollectionLock collectionLock(opCtx->lockState(), nss.ns(), MODE_IX);
auto collection = db->getCollection(opCtx, nss);
if (!collection && (*opType == 'i' || *opType == 'u')) {
uasserted(
@@ -283,18 +290,13 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
invariant(transaction);
transaction->unstashTransactionResources(opCtx, "prepareTransaction");
-
- // Abort transaction unconditionally for now.
- // TODO: SERVER-35875 / SERVER-35877 Abort or commit transactions on secondaries accordingly.
- ON_BLOCK_EXIT([&] { transaction->abortActiveTransaction(opCtx); });
-
auto status = _applyOps(
opCtx, dbName, applyOpCmd, info, oplogApplicationMode, result, numApplied, opsBuilder);
if (!status.isOK()) {
return status;
}
transaction->prepareTransaction(opCtx, optime);
-
+ transaction->stashTransactionResources(opCtx);
return Status::OK();
}
@@ -383,6 +385,24 @@ Status applyOps(OperationContext* opCtx,
BSONObjBuilder* result) {
auto info = ApplyOpsCommandInfo::parse(applyOpCmd);
+ int numApplied = 0;
+
+ // Apply prepare transaction operation if "prepare" is true.
+ // The lock requirement of transaction operations should be the same as that on the primary,
+ // so we don't acquire the locks conservatively for them.
+ if (info.getPrepare().get_value_or(false)) {
+ invariant(optime);
+ return _applyPrepareTransaction(opCtx,
+ dbName,
+ applyOpCmd,
+ info,
+ oplogApplicationMode,
+ result,
+ &numApplied,
+ nullptr,
+ *optime);
+ }
+
boost::optional<Lock::GlobalWrite> globalWriteLock;
boost::optional<Lock::DBLock> dbWriteLock;
@@ -411,22 +431,6 @@ Status applyOps(OperationContext* opCtx,
}
}
- int numApplied = 0;
-
- // Apply prepare transaction operation if "prepare" is true.
- if (info.getPrepare().get_value_or(false)) {
- invariant(optime);
- return _applyPrepareTransaction(opCtx,
- dbName,
- applyOpCmd,
- info,
- oplogApplicationMode,
- result,
- &numApplied,
- nullptr,
- *optime);
- }
-
if (!info.isAtomic()) {
return _applyOps(
opCtx, dbName, applyOpCmd, info, oplogApplicationMode, result, &numApplied, nullptr);
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 50d7d62825d..499312b3b14 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -86,6 +86,7 @@
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage/storage_options.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/platform/random.h"
#include "mongo/scripting/engine.h"
#include "mongo/stdx/memory.h"
@@ -998,7 +999,14 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
const BSONElement& ui,
BSONObj& cmd,
const OpTime& opTime,
- OplogApplication::Mode mode) -> Status { return Status::OK(); }}},
+ OplogApplication::Mode mode) -> Status {
+ // Session has been checked out by sync_tail.
+ auto transaction = TransactionParticipant::get(opCtx);
+ invariant(transaction);
+ transaction->unstashTransactionResources(opCtx, "abortTransaction");
+ transaction->abortActiveTransaction(opCtx);
+ return Status::OK();
+ }}},
};
} // namespace
@@ -1529,6 +1537,9 @@ Status applyCommand_inlock(OperationContext* opCtx,
return {ErrorCodes::InvalidNamespace, "invalid ns: " + std::string(nss.ns())};
}
{
+ // Command application doesn't always acquire the global writer lock for transaction
+ // commands, so we acquire its own locks here.
+ Lock::DBLock lock(opCtx, nss.db(), MODE_IS);
Database* db = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.ns());
if (db && !db->getCollection(opCtx, nss) && db->getViewCatalog()->lookup(opCtx, nss.ns())) {
return {ErrorCodes::CommandNotSupportedOnView,
@@ -1553,10 +1564,6 @@ Status applyCommand_inlock(OperationContext* opCtx,
<< redact(op));
}
- // Applying commands in repl is done under Global W-lock, so it is safe to not
- // perform the current DB checks after reacquiring the lock.
- invariant(opCtx->lockState()->isW());
-
// Parse optime from oplog entry unless we are applying this command in standalone or on a
// primary (replicated writes enabled).
OpTime opTime;
@@ -1567,7 +1574,7 @@ Status applyCommand_inlock(OperationContext* opCtx,
}
}
- const bool assignCommandTimestamp = [opCtx, mode, &op] {
+ const bool assignCommandTimestamp = [opCtx, mode, &op, &o] {
const auto replMode = ReplicationCoordinator::get(opCtx)->getReplicationMode();
if (opCtx->writesAreReplicated()) {
// We do not assign timestamps on replicated writes since they will get their oplog
@@ -1575,6 +1582,11 @@ Status applyCommand_inlock(OperationContext* opCtx,
return false;
}
+ // Don't assign commit timestamp for transaction commands.
+ const StringData commandName(o.firstElementFieldName());
+ if (op.getBoolField("prepare") || commandName == "abortTransaction")
+ return false;
+
switch (replMode) {
case ReplicationCoordinator::modeReplSet: {
// The 'applyOps' command never logs 'applyOps' oplog entries with nested
@@ -1582,9 +1594,7 @@ Status applyCommand_inlock(OperationContext* opCtx,
// command on secondaries. Thus, the timestamps in the command oplog
// entries are always real timestamps from this oplog and we should
// timestamp our writes with them.
- //
- // However, if "prepare" is specified, don't assign commit timestamp.
- return !op.getBoolField("prepare");
+ return true;
}
case ReplicationCoordinator::modeNone: {
// Only assign timestamps on standalones during replication recovery when
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index ee5cea520f8..1ae37147ac6 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
@@ -303,9 +304,16 @@ Status SyncTail::syncApply(OperationContext* opCtx,
});
} else if (opType == OpTypeEnum::kCommand) {
return writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] {
- // a command may need a global write lock. so we will conservatively go
- // ahead and grab one here. suboptimal. :-(
- Lock::GlobalWrite globalWriteLock(opCtx);
+ // A command may need a global write lock. so we will conservatively go
+ // ahead and grab one for non-transaction commands.
+ // Transactions have to acquire the same locks on secondaries as on primary.
+ boost::optional<Lock::GlobalWrite> globalWriteLock;
+
+ // TODO SERVER-37180 Remove this double-parsing.
+ const StringData commandName(op["o"].embeddedObject().firstElementFieldName());
+ if (!op.getBoolField("prepare") && commandName != "abortTransaction") {
+ globalWriteLock.emplace(opCtx);
+ }
// special case apply for commands to avoid implicit database creation
Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode);
@@ -1130,7 +1138,10 @@ Status multiSyncApply(OperationContext* opCtx,
UnreplicatedWritesBlock uwb(opCtx);
DisableDocumentValidation validationDisabler(opCtx);
- ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState());
+ // Since we swap the locker in stash / unstash transaction resources,
+ // ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been
+ // destroyed by unstash in its destructor. Thus we set the flag explicitly.
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
// Explicitly start future read transactions without a timestamp.
opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
@@ -1152,7 +1163,7 @@ Status multiSyncApply(OperationContext* opCtx,
MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo();
for (auto it = ops->cbegin(); it != ops->cend(); ++it) {
- const auto& entry = **it;
+ const OplogEntry& entry = **it;
// If we are successful in grouping and applying inserts, advance the current iterator
// past the end of the inserted group of entries.
@@ -1168,11 +1179,12 @@ Status multiSyncApply(OperationContext* opCtx,
// from disk may read that write, causing starting a new transaction on an existing
// txnNumber. Thus, we start a new transaction without refreshing state from disk.
boost::optional<OperationContextSessionMongodWithoutRefresh> sessionTxnState;
- if (entry.shouldPrepare()) {
- // Prepare transaction is in its own batch. We cannot modify the opCtx for other
- // ops.
+ if (entry.shouldPrepare() ||
+ (entry.isCommand() &&
+ entry.getCommandType() == OplogEntry::CommandType::kAbortTransaction)) {
// The update on transaction table may be scheduled to the same writer.
invariant(ops->size() <= 2);
+ // Transaction operations are in its own batch, so we can modify their opCtx.
invariant(entry.getSessionId());
invariant(entry.getTxnNumber());
opCtx->setLogicalSessionId(*entry.getSessionId());
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 8043df51d68..439bd7e7602 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -818,6 +818,7 @@ void execCommandDatabase(OperationContext* opCtx,
opCtx->getTxnNumber());
opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true);
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
}
auto& oss = OperationShardingState::get(opCtx);
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 3278730d7ea..d848960eb42 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -260,9 +260,12 @@ void TransactionParticipant::beginOrContinue(TxnNumber txnNumber,
_beginMultiDocumentTransaction(lg, txnNumber);
}
-void TransactionParticipant::beginTransactionUnconditionally(TxnNumber txnNumber) {
+void TransactionParticipant::beginOrContinueTransactionUnconditionally(TxnNumber txnNumber) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- _beginMultiDocumentTransaction(lg, txnNumber);
+ // Continuing transaction unconditionally is a no-op since we don't check any on-disk state.
+ if (_activeTxnNumber != txnNumber) {
+ _beginMultiDocumentTransaction(lg, txnNumber);
+ }
}
void TransactionParticipant::setSpeculativeTransactionOpTime(
@@ -302,8 +305,13 @@ TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* o
// The new transaction should have an empty locker, and thus we do not need to save it.
invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive);
_locker = opCtx->swapLockState(stdx::make_unique<LockerImpl>());
+ // Inherit the locking setting from the original one.
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(
+ _locker->shouldConflictWithSecondaryBatchApplication());
_locker->unsetThreadId();
+ // OplogSlotReserver is only used by primary, so always set max transaction lock timeout.
+ invariant(opCtx->writesAreReplicated());
// This thread must still respect the transaction lock timeout, since it can prevent the
// transaction from making progress.
auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load();
@@ -337,6 +345,9 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool
opCtx->setWriteUnitOfWork(nullptr);
_locker = opCtx->swapLockState(stdx::make_unique<LockerImpl>());
+ // Inherit the locking setting from the original one.
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(
+ _locker->shouldConflictWithSecondaryBatchApplication());
if (!keepTicket) {
_locker->releaseTicket();
}
@@ -345,10 +356,13 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool
// This thread must still respect the transaction lock timeout, since it can prevent the
// transaction from making progress.
auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load();
- if (maxTransactionLockMillis >= 0) {
+ if (opCtx->writesAreReplicated() && maxTransactionLockMillis >= 0) {
opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis));
}
+ // On secondaries, max lock timeout must not be set.
+ invariant(opCtx->writesAreReplicated() || !opCtx->lockState()->hasMaxLockTimeout());
+
_recoveryUnit = opCtx->releaseRecoveryUnit();
opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>(
opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()),
@@ -504,10 +518,13 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
// to acquire a lock. This is to avoid deadlocks and minimize non-transaction
// operation performance degradations.
auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load();
- if (maxTransactionLockMillis >= 0) {
+ if (opCtx->writesAreReplicated() && maxTransactionLockMillis >= 0) {
opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis));
}
+ // On secondaries, max lock timeout must not be set.
+ invariant(opCtx->writesAreReplicated() || !opCtx->lockState()->hasMaxLockTimeout());
+
stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
_transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx),
curTimeMicros64());
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index d287a548ee5..70042946744 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -352,7 +352,7 @@ public:
boost::optional<bool> autocommit,
boost::optional<bool> startTransaction);
- void beginTransactionUnconditionally(TxnNumber txnNumber);
+ void beginOrContinueTransactionUnconditionally(TxnNumber txnNumber);
static Status isValid(StringData dbName, StringData cmdName);