summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2018-03-23 15:48:56 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2018-03-25 23:34:35 -0400
commit2f853ce75cd1fb641bf4777368e5f0d51b9d0025 (patch)
treef344c55aa5eb2cc89c934f229cacdb1efa1a6747
parentc9f2d8059206de3ded12beca2b08a9be3d1f9b13 (diff)
downloadmongo-2f853ce75cd1fb641bf4777368e5f0d51b9d0025.tar.gz
SERVER-34068 Unify commit machinery between snapshot read and multi-document transaction
-rw-r--r--src/mongo/db/commands/txn_cmds.cpp6
-rw-r--r--src/mongo/db/op_observer_impl.cpp1
-rw-r--r--src/mongo/db/repl/do_txn.cpp8
-rw-r--r--src/mongo/db/session.cpp91
-rw-r--r--src/mongo/db/session.h21
-rw-r--r--src/mongo/db/session_test.cpp53
6 files changed, 139 insertions, 41 deletions
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp
index a76b1a794d3..9b3f9aeae08 100644
--- a/src/mongo/db/commands/txn_cmds.cpp
+++ b/src/mongo/db/commands/txn_cmds.cpp
@@ -78,11 +78,7 @@ public:
"Transaction isn't in progress",
session->inMultiDocumentTransaction());
- auto opObserver = opCtx->getServiceContext()->getOpObserver();
- invariant(opObserver);
- opObserver->onTransactionCommit(opCtx);
- opCtx->getWriteUnitOfWork()->commit();
- opCtx->setWriteUnitOfWork(nullptr);
+ session->commitTransaction(opCtx);
return true;
}
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 9587fcb0e46..1e7f3e67db7 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -840,7 +840,6 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) {
invariant(opCtx->getTxnNumber());
Session* const session = OperationContextSession::get(opCtx);
invariant(session);
- invariant(session->inMultiDocumentTransaction());
auto stmts = session->endTransactionAndRetrieveOperations();
// It is possible that the transaction resulted in no changes. In that case, we should
diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp
index 24df2f02676..cfb968c71e6 100644
--- a/src/mongo/db/repl/do_txn.cpp
+++ b/src/mongo/db/repl/do_txn.cpp
@@ -312,14 +312,10 @@ Status doTxn(OperationContext* opCtx,
numApplied = 0;
uassertStatusOK(_doTxn(opCtx, dbName, doTxnCmd, &intermediateResult, &numApplied));
- auto opObserver = getGlobalServiceContext()->getOpObserver();
- invariant(opObserver);
- opObserver->onTransactionCommit(opCtx);
+ session->commitTransaction(opCtx);
result->appendElements(intermediateResult.obj());
-
- // Commit the global WUOW if the command succeeds.
- opCtx->getWriteUnitOfWork()->commit();
} catch (const DBException& ex) {
+ session->abortActiveTransaction(opCtx);
BSONArrayBuilder ab;
++numApplied;
for (int j = 0; j < numApplied; j++)
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 703035781fd..305903d8011 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -575,7 +575,7 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
// access Session state. We must lock the Client before the Session mutex, since the Client
// effectively owns the Session. That is, a user might lock the Client to ensure it doesn't go
// away, and then lock the Session owned by that client. We rely on the fact that we are not
- // using the DefaultLockerImpl to avoid deadlock.
+ // using the DefaultLockerImpl to avoid deadlock.
invariant(!isMMAPV1());
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -602,13 +602,7 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
if (_txnState == MultiDocumentTransactionState::kInSnapshotRead && !opCtx->hasStashedCursor()) {
// The snapshot read is complete.
invariant(opCtx->getWriteUnitOfWork());
- // We cannot hold the session lock during the commit, or a deadlock results.
- _txnState = MultiDocumentTransactionState::kCommitting;
- lg.unlock();
- opCtx->getWriteUnitOfWork()->commit();
- opCtx->setWriteUnitOfWork(nullptr);
- lg.lock();
- _txnState = MultiDocumentTransactionState::kCommitted;
+ _commitTransaction(std::move(lg), opCtx);
return;
}
@@ -655,10 +649,15 @@ void Session::unstashTransactionResources(OperationContext* opCtx) {
}
if (_txnResourceStash) {
+ // Transaction resources already exist for this transaction. Transfer them from the
+ // stash to the operation context.
invariant(_txnState != MultiDocumentTransactionState::kNone);
_txnResourceStash->release(opCtx);
_txnResourceStash = boost::none;
} else {
+ // Stashed transaction resources do not exist for this transaction. If this is a
+ // snapshot read or a multi-document transaction, set up the transaction resources on
+ // the opCtx.
auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern ||
_txnState == MultiDocumentTransactionState::kInProgress) {
@@ -710,7 +709,10 @@ void Session::abortActiveTransaction(OperationContext* opCtx) {
void Session::_abortTransaction(WithLock wl) {
// TODO SERVER-33432 Disallow aborting committed transaction after we implement implicit abort.
- if (_txnState == MultiDocumentTransactionState::kCommitted) {
+ // A transaction in kCommitting state will either commit or abort for storage-layer reasons;
+ // it is too late to abort externally.
+ if (_txnState == MultiDocumentTransactionState::kCommitting ||
+ _txnState == MultiDocumentTransactionState::kCommitted) {
return;
}
_txnResourceStash = boost::none;
@@ -742,30 +744,71 @@ void Session::addTransactionOperation(OperationContext* opCtx,
invariant(_txnState == MultiDocumentTransactionState::kInProgress);
invariant(!_autocommit && _activeTxnNumber != kUninitializedTxnNumber);
invariant(opCtx->lockState()->inAWriteUnitOfWork());
- if (_transactionOperations.empty()) {
- auto txnNumberCompleting = _activeTxnNumber;
- opCtx->recoveryUnit()->onCommit([this, txnNumberCompleting] {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_activeTxnNumber == txnNumberCompleting);
- invariant(_txnState == MultiDocumentTransactionState::kCommitting ||
- _txnState == MultiDocumentTransactionState::kCommitted);
- _txnState = MultiDocumentTransactionState::kCommitted;
- });
- }
_transactionOperations.push_back(operation);
}
std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(!_autocommit);
- invariant(_txnState == MultiDocumentTransactionState::kInProgress);
- // If _transactionOperations is empty, we will not see a commit because the write unit
- // of work is empty.
- _txnState = _transactionOperations.empty() ? MultiDocumentTransactionState::kCommitted
- : MultiDocumentTransactionState::kCommitting;
return std::move(_transactionOperations);
}
+void Session::commitTransaction(OperationContext* opCtx) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (opCtx->getTxnNumber() != _activeTxnNumber) {
+ uasserted(ErrorCodes::TransactionAborted,
+ str::stream() << "Transaction aborted. Active txnNumber is now "
+ << _activeTxnNumber);
+ }
+ if (_txnState == MultiDocumentTransactionState::kCommitted)
+ return;
+ _commitTransaction(std::move(lk), opCtx);
+}
+
+void Session::_commitTransaction(stdx::unique_lock<stdx::mutex> lk, OperationContext* opCtx) {
+ invariant(_txnState == MultiDocumentTransactionState::kInProgress ||
+ _txnState == MultiDocumentTransactionState::kInSnapshotRead);
+ const bool isMultiDocumentTransaction = _txnState == MultiDocumentTransactionState::kInProgress;
+ if (isMultiDocumentTransaction) {
+ // We need to unlock the session to run the opObserver onTransactionCommit, which calls back
+ // into the session.
+ lk.unlock();
+ auto opObserver = opCtx->getServiceContext()->getOpObserver();
+ invariant(opObserver);
+ opObserver->onTransactionCommit(opCtx);
+ lk.lock();
+ // It's possible some other thread aborted the transaction (e.g. through killSession)
+ // while the opObserver was running. If that happened, the commit should be reported
+ // as failed.
+ uassert(ErrorCodes::TransactionAborted,
+ str::stream() << "Transaction " << opCtx->getTxnNumber()
+ << " aborted while attempting to commit",
+ _txnState == MultiDocumentTransactionState::kInProgress &&
+ _activeTxnNumber == opCtx->getTxnNumber());
+ }
+ _txnState = MultiDocumentTransactionState::kCommitting;
+ bool committed = false;
+ ON_BLOCK_EXIT([this, &committed, opCtx]() {
+ // If we're still "committing", the recovery unit failed to commit, and the lock
+ // is not held. We can't safely use _txnState here, as it is protected by the lock.
+ if (!committed) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ opCtx->setWriteUnitOfWork(nullptr);
+ // Make sure the transaction didn't change because of chunk migration.
+ if (opCtx->getTxnNumber() == _activeTxnNumber) {
+ _txnState = MultiDocumentTransactionState::kAborted;
+ }
+ }
+ _commitcv.notify_all();
+ });
+ lk.unlock();
+ opCtx->getWriteUnitOfWork()->commit();
+ opCtx->setWriteUnitOfWork(nullptr);
+ committed = true;
+ lk.lock();
+ _txnState = MultiDocumentTransactionState::kCommitted;
+}
+
void Session::_checkValid(WithLock) const {
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Session " << getSessionId()
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 25361f74f38..3e2af4b96cb 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -229,6 +229,12 @@ public:
void abortIfSnapshotRead(TxnNumber txnNumber);
/**
+ * Commits the transaction, including committing the write unit of work and updating
+ * transaction state.
+ */
+ void commitTransaction(OperationContext* opCtx);
+
+ /**
* Aborts the transaction outside the transaction, releasing transaction resources.
*/
void abortArbitraryTransaction();
@@ -262,6 +268,16 @@ public:
_txnState == MultiDocumentTransactionState::kInSnapshotRead;
}
+ bool transactionIsCommitted() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _txnState == MultiDocumentTransactionState::kCommitted;
+ }
+
+ bool transactionIsAborted() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _txnState == MultiDocumentTransactionState::kAborted;
+ }
+
/**
* Adds a stored operation to the list of stored operations for the current multi-document
* (non-autocommit) transaction. It is illegal to add operations when no multi-document
@@ -320,11 +336,16 @@ private:
// Releases stashed transaction resources to abort the transaction.
void _abortTransaction(WithLock);
+ void _commitTransaction(stdx::unique_lock<stdx::mutex> lk, OperationContext* opCtx);
+
const LogicalSessionId _sessionId;
// Protects the member variables below.
mutable stdx::mutex _mutex;
+ // Condition variable notified when we finish an attempt to commit the global WUOW.
+ stdx::condition_variable _commitcv;
+
// Specifies whether the session information needs to be refreshed from storage
bool _isValid{false};
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index 5fa54698d0b..004e554d9e0 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -586,8 +586,8 @@ TEST_F(SessionTest, StashAndUnstashResources) {
ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
ASSERT(opCtx()->getWriteUnitOfWork());
- // Commit the WriteUnitOfWork. This allows us to release locks.
- opCtx()->getWriteUnitOfWork()->commit();
+ // Commit the transaction. This allows us to release locks.
+ session.commitTransaction(opCtx());
}
TEST_F(SessionTest, CheckAutocommitOnlyAllowedAtBeginningOfTxn) {
@@ -634,14 +634,57 @@ TEST_F(SessionTest, AbortClearsStoredStatements) {
session.refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 24;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
session.beginOrContinueTxn(opCtx(), txnNum, false);
-
- WriteUnitOfWork wuow(opCtx());
+ session.unstashTransactionResources(opCtx());
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
session.addTransactionOperation(opCtx(), operation);
ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON());
- session.abortActiveTransaction(opCtx());
+ // The transaction machinery cannot store an empty locker.
+ { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); }
+ session.stashTransactionResources(opCtx());
+ session.abortArbitraryTransaction();
ASSERT_TRUE(session.transactionOperationsForTest().empty());
+ ASSERT_TRUE(session.transactionIsAborted());
+}
+
+// This test makes sure the commit machinery works even when no operations are done on the
+// transaction.
+TEST_F(SessionTest, EmptyTransactionCommit) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 25;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+ session.unstashTransactionResources(opCtx());
+ // The transaction machinery cannot store an empty locker.
+ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now());
+ session.commitTransaction(opCtx());
+ session.stashTransactionResources(opCtx());
+ ASSERT_TRUE(session.transactionIsCommitted());
+}
+
+// This test makes sure the abort machinery works even when no operations are done on the
+// transaction.
+TEST_F(SessionTest, EmptyTransactionAbort) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+ session.unstashTransactionResources(opCtx());
+ // The transaction machinery cannot store an empty locker.
+ { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); }
+ session.stashTransactionResources(opCtx());
+ session.abortArbitraryTransaction();
+ ASSERT_TRUE(session.transactionIsAborted());
}
} // namespace