diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2018-03-23 15:48:56 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2018-03-25 23:34:35 -0400 |
commit | 2f853ce75cd1fb641bf4777368e5f0d51b9d0025 (patch) | |
tree | f344c55aa5eb2cc89c934f229cacdb1efa1a6747 | |
parent | c9f2d8059206de3ded12beca2b08a9be3d1f9b13 (diff) | |
download | mongo-2f853ce75cd1fb641bf4777368e5f0d51b9d0025.tar.gz |
SERVER-34068 Unify commit machinery between snapshot read and multi-document transaction
-rw-r--r-- | src/mongo/db/commands/txn_cmds.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/do_txn.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 91 | ||||
-rw-r--r-- | src/mongo/db/session.h | 21 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 53 |
6 files changed, 139 insertions, 41 deletions
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp index a76b1a794d3..9b3f9aeae08 100644 --- a/src/mongo/db/commands/txn_cmds.cpp +++ b/src/mongo/db/commands/txn_cmds.cpp @@ -78,11 +78,7 @@ public: "Transaction isn't in progress", session->inMultiDocumentTransaction()); - auto opObserver = opCtx->getServiceContext()->getOpObserver(); - invariant(opObserver); - opObserver->onTransactionCommit(opCtx); - opCtx->getWriteUnitOfWork()->commit(); - opCtx->setWriteUnitOfWork(nullptr); + session->commitTransaction(opCtx); return true; } diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 9587fcb0e46..1e7f3e67db7 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -840,7 +840,6 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) { invariant(opCtx->getTxnNumber()); Session* const session = OperationContextSession::get(opCtx); invariant(session); - invariant(session->inMultiDocumentTransaction()); auto stmts = session->endTransactionAndRetrieveOperations(); // It is possible that the transaction resulted in no changes. In that case, we should diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp index 24df2f02676..cfb968c71e6 100644 --- a/src/mongo/db/repl/do_txn.cpp +++ b/src/mongo/db/repl/do_txn.cpp @@ -312,14 +312,10 @@ Status doTxn(OperationContext* opCtx, numApplied = 0; uassertStatusOK(_doTxn(opCtx, dbName, doTxnCmd, &intermediateResult, &numApplied)); - auto opObserver = getGlobalServiceContext()->getOpObserver(); - invariant(opObserver); - opObserver->onTransactionCommit(opCtx); + session->commitTransaction(opCtx); result->appendElements(intermediateResult.obj()); - - // Commit the global WUOW if the command succeeds. - opCtx->getWriteUnitOfWork()->commit(); } catch (const DBException& ex) { + session->abortActiveTransaction(opCtx); BSONArrayBuilder ab; ++numApplied; for (int j = 0; j < numApplied; j++) diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 703035781fd..305903d8011 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -575,7 +575,7 @@ void Session::stashTransactionResources(OperationContext* opCtx) { // access Session state. We must lock the Client before the Session mutex, since the Client // effectively owns the Session. That is, a user might lock the Client to ensure it doesn't go // away, and then lock the Session owned by that client. We rely on the fact that we are not - // using the DefaultLockerImpl to avoid deadlock. + // using the DefaultLockerImpl to avoid deadlock. invariant(!isMMAPV1()); stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -602,13 +602,7 @@ void Session::stashTransactionResources(OperationContext* opCtx) { if (_txnState == MultiDocumentTransactionState::kInSnapshotRead && !opCtx->hasStashedCursor()) { // The snapshot read is complete. invariant(opCtx->getWriteUnitOfWork()); - // We cannot hold the session lock during the commit, or a deadlock results. - _txnState = MultiDocumentTransactionState::kCommitting; - lg.unlock(); - opCtx->getWriteUnitOfWork()->commit(); - opCtx->setWriteUnitOfWork(nullptr); - lg.lock(); - _txnState = MultiDocumentTransactionState::kCommitted; + _commitTransaction(std::move(lg), opCtx); return; } @@ -655,10 +649,15 @@ void Session::unstashTransactionResources(OperationContext* opCtx) { } if (_txnResourceStash) { + // Transaction resources already exist for this transaction. Transfer them from the + // stash to the operation context. invariant(_txnState != MultiDocumentTransactionState::kNone); _txnResourceStash->release(opCtx); _txnResourceStash = boost::none; } else { + // Stashed transaction resources do not exist for this transaction. If this is a + // snapshot read or a multi-document transaction, set up the transaction resources on + // the opCtx. auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern || _txnState == MultiDocumentTransactionState::kInProgress) { @@ -710,7 +709,10 @@ void Session::abortActiveTransaction(OperationContext* opCtx) { void Session::_abortTransaction(WithLock wl) { // TODO SERVER-33432 Disallow aborting committed transaction after we implement implicit abort. - if (_txnState == MultiDocumentTransactionState::kCommitted) { + // A transaction in kCommitting state will either commit or abort for storage-layer reasons; + // it is too late to abort externally. + if (_txnState == MultiDocumentTransactionState::kCommitting || + _txnState == MultiDocumentTransactionState::kCommitted) { return; } _txnResourceStash = boost::none; @@ -742,30 +744,71 @@ void Session::addTransactionOperation(OperationContext* opCtx, invariant(_txnState == MultiDocumentTransactionState::kInProgress); invariant(!_autocommit && _activeTxnNumber != kUninitializedTxnNumber); invariant(opCtx->lockState()->inAWriteUnitOfWork()); - if (_transactionOperations.empty()) { - auto txnNumberCompleting = _activeTxnNumber; - opCtx->recoveryUnit()->onCommit([this, txnNumberCompleting] { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_activeTxnNumber == txnNumberCompleting); - invariant(_txnState == MultiDocumentTransactionState::kCommitting || - _txnState == MultiDocumentTransactionState::kCommitted); - _txnState = MultiDocumentTransactionState::kCommitted; - }); - } _transactionOperations.push_back(operation); } std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations() { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(!_autocommit); - invariant(_txnState == MultiDocumentTransactionState::kInProgress); - // If _transactionOperations is empty, we will not see a commit because the write unit - // of work is empty. - _txnState = _transactionOperations.empty() ? MultiDocumentTransactionState::kCommitted - : MultiDocumentTransactionState::kCommitting; return std::move(_transactionOperations); } +void Session::commitTransaction(OperationContext* opCtx) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (opCtx->getTxnNumber() != _activeTxnNumber) { + uasserted(ErrorCodes::TransactionAborted, + str::stream() << "Transaction aborted. Active txnNumber is now " + << _activeTxnNumber); + } + if (_txnState == MultiDocumentTransactionState::kCommitted) + return; + _commitTransaction(std::move(lk), opCtx); +} + +void Session::_commitTransaction(stdx::unique_lock<stdx::mutex> lk, OperationContext* opCtx) { + invariant(_txnState == MultiDocumentTransactionState::kInProgress || + _txnState == MultiDocumentTransactionState::kInSnapshotRead); + const bool isMultiDocumentTransaction = _txnState == MultiDocumentTransactionState::kInProgress; + if (isMultiDocumentTransaction) { + // We need to unlock the session to run the opObserver onTransactionCommit, which calls back + // into the session. + lk.unlock(); + auto opObserver = opCtx->getServiceContext()->getOpObserver(); + invariant(opObserver); + opObserver->onTransactionCommit(opCtx); + lk.lock(); + // It's possible some other thread aborted the transaction (e.g. through killSession) + // while the opObserver was running. If that happened, the commit should be reported + // as failed. + uassert(ErrorCodes::TransactionAborted, + str::stream() << "Transaction " << opCtx->getTxnNumber() + << " aborted while attempting to commit", + _txnState == MultiDocumentTransactionState::kInProgress && + _activeTxnNumber == opCtx->getTxnNumber()); + } + _txnState = MultiDocumentTransactionState::kCommitting; + bool committed = false; + ON_BLOCK_EXIT([this, &committed, opCtx]() { + // If we're still "committing", the recovery unit failed to commit, and the lock + // is not held. We can't safely use _txnState here, as it is protected by the lock. + if (!committed) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + opCtx->setWriteUnitOfWork(nullptr); + // Make sure the transaction didn't change because of chunk migration. + if (opCtx->getTxnNumber() == _activeTxnNumber) { + _txnState = MultiDocumentTransactionState::kAborted; + } + } + _commitcv.notify_all(); + }); + lk.unlock(); + opCtx->getWriteUnitOfWork()->commit(); + opCtx->setWriteUnitOfWork(nullptr); + committed = true; + lk.lock(); + _txnState = MultiDocumentTransactionState::kCommitted; +} + void Session::_checkValid(WithLock) const { uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Session " << getSessionId() diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 25361f74f38..3e2af4b96cb 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -229,6 +229,12 @@ public: void abortIfSnapshotRead(TxnNumber txnNumber); /** + * Commits the transaction, including committing the write unit of work and updating + * transaction state. + */ + void commitTransaction(OperationContext* opCtx); + + /** * Aborts the transaction outside the transaction, releasing transaction resources. */ void abortArbitraryTransaction(); @@ -262,6 +268,16 @@ public: _txnState == MultiDocumentTransactionState::kInSnapshotRead; } + bool transactionIsCommitted() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _txnState == MultiDocumentTransactionState::kCommitted; + } + + bool transactionIsAborted() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _txnState == MultiDocumentTransactionState::kAborted; + } + /** * Adds a stored operation to the list of stored operations for the current multi-document * (non-autocommit) transaction. It is illegal to add operations when no multi-document @@ -320,11 +336,16 @@ private: // Releases stashed transaction resources to abort the transaction. void _abortTransaction(WithLock); + void _commitTransaction(stdx::unique_lock<stdx::mutex> lk, OperationContext* opCtx); + const LogicalSessionId _sessionId; // Protects the member variables below. mutable stdx::mutex _mutex; + // Condition variable notified when we finish an attempt to commit the global WUOW. + stdx::condition_variable _commitcv; + // Specifies whether the session information needs to be refreshed from storage bool _isValid{false}; diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index 5fa54698d0b..004e554d9e0 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -586,8 +586,8 @@ TEST_F(SessionTest, StashAndUnstashResources) { ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit()); ASSERT(opCtx()->getWriteUnitOfWork()); - // Commit the WriteUnitOfWork. This allows us to release locks. - opCtx()->getWriteUnitOfWork()->commit(); + // Commit the transaction. This allows us to release locks. + session.commitTransaction(opCtx()); } TEST_F(SessionTest, CheckAutocommitOnlyAllowedAtBeginningOfTxn) { @@ -634,14 +634,57 @@ TEST_F(SessionTest, AbortClearsStoredStatements) { session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 24; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false); - - WriteUnitOfWork wuow(opCtx()); + session.unstashTransactionResources(opCtx()); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); session.addTransactionOperation(opCtx(), operation); ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON()); - session.abortActiveTransaction(opCtx()); + // The transaction machinery cannot store an empty locker. + { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } + session.stashTransactionResources(opCtx()); + session.abortArbitraryTransaction(); ASSERT_TRUE(session.transactionOperationsForTest().empty()); + ASSERT_TRUE(session.transactionIsAborted()); +} + +// This test makes sure the commit machinery works even when no operations are done on the +// transaction. +TEST_F(SessionTest, EmptyTransactionCommit) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 25; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + session.unstashTransactionResources(opCtx()); + // The transaction machinery cannot store an empty locker. + Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); + session.commitTransaction(opCtx()); + session.stashTransactionResources(opCtx()); + ASSERT_TRUE(session.transactionIsCommitted()); +} + +// This test makes sure the abort machinery works even when no operations are done on the +// transaction. +TEST_F(SessionTest, EmptyTransactionAbort) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + session.unstashTransactionResources(opCtx()); + // The transaction machinery cannot store an empty locker. + { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } + session.stashTransactionResources(opCtx()); + session.abortArbitraryTransaction(); + ASSERT_TRUE(session.transactionIsAborted()); } } // namespace |