summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2018-03-13 19:50:49 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2018-03-15 18:51:03 -0400
commitb762003b0b692fe78658e675f06495f132af442c (patch)
treeae6acbc0270ec93adfb8f9654ea830839b42dc6e /src/mongo/db
parentd0dea93d632ce451b66b57b538e66f3e1378b049 (diff)
downloadmongo-b762003b0b692fe78658e675f06495f132af442c.tar.gz
SERVER-33591 Cleanup interaction of write concerns and transactions
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/do_txn_test.cpp2
-rw-r--r--src/mongo/db/service_entry_point_common.cpp38
-rw-r--r--src/mongo/db/session.cpp4
-rw-r--r--src/mongo/db/session_catalog.cpp33
-rw-r--r--src/mongo/db/session_catalog.h17
-rw-r--r--src/mongo/db/session_catalog_test.cpp122
6 files changed, 54 insertions, 162 deletions
diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp
index f1a871c40c5..c9d940ee485 100644
--- a/src/mongo/db/repl/do_txn_test.cpp
+++ b/src/mongo/db/repl/do_txn_test.cpp
@@ -127,7 +127,7 @@ void DoTxnTest::setUp() {
_opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
_opCtx->setTxnNumber(0); // TxnNumber can always be 0 because we have a new session.
_ocs.emplace(_opCtx.get(), true /* checkOutSession */, false /* autocommit */);
- _ocs->unstashTransactionResources();
+ OperationContextSession::get(opCtx())->unstashTransactionResources(opCtx());
}
void DoTxnTest::tearDown() {
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index ddc3829e6f6..64f38e60008 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -407,6 +407,35 @@ LogicalTime computeOperationTime(OperationContext* opCtx,
return operationTime;
}
+void invokeInTransaction(OperationContext* opCtx,
+ CommandInvocation* invocation,
+ CommandReplyBuilder* replyBuilder) {
+ // Only get the session if it's at top nesting level.
+ const bool topLevelOnly = true;
+ auto session = OperationContextSession::get(opCtx, topLevelOnly);
+ if (!session) {
+ // Run the command directly if we're not in a transaction.
+ invocation->run(opCtx, replyBuilder);
+ return;
+ }
+
+ session->unstashTransactionResources(opCtx);
+
+ // TODO: SERVER-33217 Add an RAII so that any exception will abort the transaction.
+ invocation->run(opCtx, replyBuilder);
+
+ if (auto okField = replyBuilder->getBodyBuilder().asTempObj()["ok"]) {
+ // If ok is present, use its truthiness.
+ if (!okField.trueValue()) {
+ // TODO: SERVER-33217 Abort the transaction if the command fails.
+ return;
+ }
+ }
+
+ // Stash or commit the transaction when the command succeeds.
+ session->stashTransactionResources(opCtx);
+}
+
bool runCommandImpl(OperationContext* opCtx,
CommandInvocation* invocation,
const OpMsgRequest& request,
@@ -428,7 +457,7 @@ bool runCommandImpl(OperationContext* opCtx,
if (!invocation->supportsWriteConcern()) {
behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
- invocation->run(opCtx, &crb);
+ invokeInTransaction(opCtx, invocation, &crb);
} else {
auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body));
@@ -442,7 +471,7 @@ bool runCommandImpl(OperationContext* opCtx,
behaviors.waitForWriteConcern(
opCtx, invocation->definition()->getName(), lastOpBeforeRun, crb.getBodyBuilder());
});
- invocation->run(opCtx, &crb);
+ invokeInTransaction(opCtx, invocation, &crb);
// Nothing in run() should change the writeConcern.
dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() ==
@@ -699,13 +728,10 @@ void execCommandDatabase(OperationContext* opCtx,
behaviors.waitForReadConcern(opCtx, invocation.get(), request);
- sessionTxnState.unstashTransactionResources();
retval = runCommandImpl(
opCtx, invocation.get(), request, replyBuilder, startOperationTime, behaviors);
- if (retval) {
- sessionTxnState.stashTransactionResources();
- } else {
+ if (!retval) {
command->incrementCommandsFailed();
}
} catch (const DBException& e) {
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index db36244520e..cd6106a1346 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -556,6 +556,8 @@ void Session::TxnResources::release(OperationContext* opCtx) {
}
void Session::stashTransactionResources(OperationContext* opCtx) {
+ invariant(opCtx->getTxnNumber());
+
// We must lock the Client to change the Locker on the OperationContext and the Session mutex to
// access Session state. We must lock the Client before the Session mutex, since the Client
// effectively owns the Session. That is, a user might lock the Client to ensure it doesn't go
@@ -602,6 +604,8 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
}
void Session::unstashTransactionResources(OperationContext* opCtx) {
+ invariant(opCtx->getTxnNumber());
+
// If the storage engine is mmapv1, it is not safe to lock both the Client and the Session
// mutex. This is fine because mmapv1 does not support transactions.
if (isMMAPV1()) {
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index ba9a7edd958..fe78ad94dbe 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -343,37 +343,12 @@ OperationContextSession::~OperationContextSession() {
}
}
-void OperationContextSession::stashTransactionResources() {
- if (!_opCtx->getTxnNumber()) {
- return;
- }
-
- if (auto& checkedOutSession = operationSessionDecoration(_opCtx)) {
- if (checkedOutSession->checkOutNestingLevel == 1) {
- if (auto session = checkedOutSession->scopedSession.get()) {
- session->stashTransactionResources(_opCtx);
- }
- }
- }
-}
-
-void OperationContextSession::unstashTransactionResources() {
- if (!_opCtx->getTxnNumber()) {
- return;
- }
-
- if (auto& checkedOutSession = operationSessionDecoration(_opCtx)) {
- if (checkedOutSession->checkOutNestingLevel == 1) {
- if (auto session = checkedOutSession->scopedSession.get()) {
- session->unstashTransactionResources(_opCtx);
- }
- }
- }
-}
-
-Session* OperationContextSession::get(OperationContext* opCtx) {
+Session* OperationContextSession::get(OperationContext* opCtx, bool topLevelOnly) {
auto& checkedOutSession = operationSessionDecoration(opCtx);
if (checkedOutSession) {
+ if (topLevelOnly && checkedOutSession->checkOutNestingLevel != 1) {
+ return nullptr;
+ }
return checkedOutSession->scopedSession.get();
}
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index ddcea4db9fb..d3b37c2183c 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -272,6 +272,9 @@ private:
* Scoped object, which checks out the session specified in the passed operation context and stores
* it for later access by the command. The session is installed at construction time and is removed
* at destruction.
+ *
+ * Nested OperationContextSessions only check out the session once at the top level, but the checked
+ * out session is accessible via get() in inner scopes. This could happen due to DBDirectClient.
*/
class OperationContextSession {
MONGO_DISALLOW_COPYING(OperationContextSession);
@@ -283,19 +286,11 @@ public:
~OperationContextSession();
- static Session* get(OperationContext* opCtx);
-
- /**
- * Stash the Locker and RecoveryUnit if both:
- * - The current session represents a transaction running in snapshot isolation.
- * - The current operation is ending with an open client cursor.
- */
- void stashTransactionResources();
-
/**
- * Restore the stashed Locker and RecoveryUnit for the current transaction, if they exist.
+ * Returns the session checked out in the constructor. If "topLevelOnly" is true, it returns
+ * the session when it's at the top nesting level, and nullptr otherwise.
*/
- void unstashTransactionResources();
+ static Session* get(OperationContext* opCtx, bool topLevelOnly = false);
private:
OperationContext* const _opCtx;
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index a02203e5554..42fb7d5b321 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -77,36 +77,6 @@ TEST_F(SessionCatalogTest, OperationContextCheckedOutSession) {
auto session = OperationContextSession::get(opCtx());
ASSERT(session);
ASSERT_EQ(*opCtx()->getLogicalSessionId(), session->getSessionId());
- session->refreshFromStorageIfNeeded(opCtx());
- session->beginOrContinueTxn(opCtx(), txnNum, boost::none);
-
- // Set the readConcern level on the operation to snapshot. This ensures that unstash sets up a
- // WriteUnitOfWork on the OperationContext.
- repl::ReadConcernArgs readConcernArgs;
- ASSERT_OK(readConcernArgs.initialize(BSON("find"
- << "test"
- << repl::ReadConcernArgs::kReadConcernFieldName
- << BSON(repl::ReadConcernArgs::kLevelFieldName
- << "snapshot"))));
- repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
-
- // Confirm that unstash can be executed against a top-level checked-out Session.
- ocs.unstashTransactionResources();
-
- // Stashing requires we are holding locks and either have a stashed cursor or are in a
- // multi-statement transaction.
- opCtx()->setStashedCursor();
- Lock::GlobalRead lk(opCtx(), Date_t::now());
- ASSERT(lk.isLocked());
-
- // Confirm that stash can be executed against a top-level checked-out Session.
- ocs.stashTransactionResources();
-
- // TODO SERVER-33672: This can be removed when it no longer causes a hang to destroy the
- // SessionCatalog when a Session contains stashed transaction resources.
- repl::ReadConcernArgs::get(opCtx()) = repl::ReadConcernArgs();
- ocs.unstashTransactionResources();
- opCtx()->getWriteUnitOfWork()->commit();
}
TEST_F(SessionCatalogTest, OperationContextNonCheckedOutSession) {
@@ -116,11 +86,6 @@ TEST_F(SessionCatalogTest, OperationContextNonCheckedOutSession) {
auto session = OperationContextSession::get(opCtx());
ASSERT(!session);
-
- // Confirm that unstash can be executed against a top-level not-checked-out Session (this is a
- // noop). We do not expect stash to be executed against a top-level not-checked-out Session,
- // since we will not be in a snapshot read or multi-statement transaction.
- ocs.unstashTransactionResources();
}
TEST_F(SessionCatalogTest, GetOrCreateNonExistentSession) {
@@ -185,99 +150,26 @@ TEST_F(SessionCatalogTest, NestedOperationContextSession) {
ASSERT(!OperationContextSession::get(opCtx()));
}
-TEST_F(SessionCatalogTest, StashInNestedSessionIsANoop) {
+TEST_F(SessionCatalogTest, CannotAccessTopLevelSessionInNestedOnes) {
opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
opCtx()->setTxnNumber(1);
{
OperationContextSession outerScopedSession(opCtx(), true, boost::none);
- Locker* originalLocker = opCtx()->lockState();
- RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
- ASSERT(originalLocker);
- ASSERT(originalRecoveryUnit);
-
- // Set the readConcern on the OperationContext.
- repl::ReadConcernArgs readConcernArgs;
- ASSERT_OK(readConcernArgs.initialize(BSON("find"
- << "test"
- << repl::ReadConcernArgs::kReadConcernFieldName
- << BSON(repl::ReadConcernArgs::kLevelFieldName
- << "snapshot"))));
- repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
-
- // Perform initial unstash, which sets up a WriteUnitOfWork.
- outerScopedSession.unstashTransactionResources();
- ASSERT_EQUALS(originalLocker, opCtx()->lockState());
- ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
- ASSERT(opCtx()->getWriteUnitOfWork());
-
+ auto* session = outerScopedSession.get(opCtx(), true);
+ ASSERT(session);
{
OperationContextSession innerScopedSession(opCtx(), true, boost::none);
- // Indicate that there is a stashed cursor. If we were not in a nested session, this
- // would ensure that stashing is not a noop.
- opCtx()->setStashedCursor();
-
- innerScopedSession.stashTransactionResources();
-
- // The stash was a noop, so the locker, RecoveryUnit, and WriteUnitOfWork on the
- // OperationContext are unaffected.
- ASSERT_EQUALS(originalLocker, opCtx()->lockState());
- ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
- ASSERT(opCtx()->getWriteUnitOfWork());
+ // Cannot get the top level session since we're a nested one.
+ const bool topLevelOnly = true;
+ auto* innerSession = OperationContextSession::get(opCtx(), topLevelOnly);
+ ASSERT(!innerSession);
}
}
}
-TEST_F(SessionCatalogTest, UnstashInNestedSessionIsANoop) {
- opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
- opCtx()->setTxnNumber(1);
-
- {
- OperationContextSession outerScopedSession(opCtx(), true, boost::none);
-
- Locker* originalLocker = opCtx()->lockState();
- RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
- ASSERT(originalLocker);
- ASSERT(originalRecoveryUnit);
-
- // Set the readConcern on the OperationContext.
- repl::ReadConcernArgs readConcernArgs;
- ASSERT_OK(readConcernArgs.initialize(BSON("find"
- << "test"
- << repl::ReadConcernArgs::kReadConcernFieldName
- << BSON(repl::ReadConcernArgs::kLevelFieldName
- << "snapshot"))));
- repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
-
- {
- OperationContextSession innerScopedSession(opCtx(), true, boost::none);
-
- innerScopedSession.unstashTransactionResources();
-
- // The unstash was a noop, so the OperationContext did not get a WriteUnitOfWork.
- ASSERT_EQUALS(originalLocker, opCtx()->lockState());
- ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
- ASSERT_FALSE(opCtx()->getWriteUnitOfWork());
- }
- }
-}
-
-TEST_F(SessionCatalogTest, OnlyCheckOutSessionWithCheckOutSessionTrue) {
- opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
-
- {
- OperationContextSession ocs(opCtx(), true, boost::none);
- ASSERT(OperationContextSession::get(opCtx()));
- }
-
- {
- OperationContextSession ocs(opCtx(), false, boost::none);
- ASSERT(!OperationContextSession::get(opCtx()));
- }
-}
-
TEST_F(SessionCatalogTest, ScanSessions) {
std::vector<LogicalSessionId> lsids;
auto workerFn = [&](OperationContext* opCtx, Session* session) {