summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2019-01-10 15:01:36 -0500
committerAndy Schwerin <schwerin@mongodb.com>2019-01-10 15:01:36 -0500
commitfacdcf14cfec192876a373fc49163769215327d8 (patch)
treee461394175313e947f950b8b968e57afb808ff70 /src/mongo/db
parente276f7d9b5262f99ec374018c3d29c53b8f757e4 (diff)
downloadmongo-facdcf14cfec192876a373fc49163769215327d8.tar.gz
SERVER-38810 Simplify correct usage of locking in Session/SessionCatalog.
This patch introduces two new types for gaining access to sessions in the session catalog, based on the context of the access. It leaves a third way to access sessions, which is via OperationContextSession::get(). The new types are ObservableSession and SessionToKill. With this change, a thread may access a session in one of three ways: (1) by binding the session to an OperationContext for regular use, by instantiation of OperationContextSession or MongoDOperationContextSession, as before. (2) In the callback passed to SessionCatalog::scanSessions, in which case the callback is passed an ObservableSession that represents looking at a Session while the SessionCatalog mutex is locked and while the bound OperationContext's client mutex is held, if the session is currently checked out via methods (1) or (3). (3) By calling SessionCatalog::checkOutSessionForKill, which returns a KillableSession. This is used for cleaning a session up after it is marked for kill. This patch eliminates Session::_mutex, which is no longer required, as the SessionCatalog::_mutex and the Client provide all necessary mutual exclusion for the Session type itself.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/kill_sessions_local.cpp68
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp12
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp16
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp121
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp194
-rw-r--r--src/mongo/db/session.cpp86
-rw-r--r--src/mongo/db/session.h71
-rw-r--r--src/mongo/db/session_catalog.cpp101
-rw-r--r--src/mongo/db/session_catalog.h195
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp32
-rw-r--r--src/mongo/db/session_catalog_test.cpp86
-rw-r--r--src/mongo/db/transaction_participant.cpp2
-rw-r--r--src/mongo/db/transaction_participant_test.cpp5
14 files changed, 515 insertions, 475 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 41bfd7cbaa8..5cd1763cf13 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -668,7 +668,6 @@ env.Library(
target='session_catalog',
source=[
'session_catalog.cpp',
- 'session.cpp',
],
LIBDEPS=[
'kill_sessions',
diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp
index 56ffabf14cf..79b7283d4c3 100644
--- a/src/mongo/db/kill_sessions_local.cpp
+++ b/src/mongo/db/kill_sessions_local.cpp
@@ -55,17 +55,18 @@ namespace {
* context with 'reason' as the code.
* 3) Finish killing the selected and interrupted sessions through the 'killSessionFn'.
*/
-void killSessionsAction(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher,
- const stdx::function<bool(Session*)>& filterFn,
- const stdx::function<void(Session*)>& killSessionFn,
- ErrorCodes::Error reason = ErrorCodes::Interrupted) {
+void killSessionsAction(
+ OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher,
+ const stdx::function<bool(const ObservableSession&)>& filterFn,
+ const stdx::function<void(OperationContext*, const SessionToKill&)>& killSessionFn,
+ ErrorCodes::Error reason = ErrorCodes::Interrupted) {
const auto catalog = SessionCatalog::get(opCtx);
- std::vector<Session::KillToken> sessionKillTokens;
- catalog->scanSessions(matcher, [&](WithLock sessionCatalogLock, Session* session) {
+ std::vector<SessionCatalog::KillToken> sessionKillTokens;
+ catalog->scanSessions(matcher, [&](const ObservableSession& session) {
if (filterFn(session))
- sessionKillTokens.emplace_back(session->kill(sessionCatalogLock, reason));
+ sessionKillTokens.emplace_back(session.kill(reason));
});
for (auto& sessionKillToken : sessionKillTokens) {
@@ -73,11 +74,11 @@ void killSessionsAction(OperationContext* opCtx,
// TODO (SERVER-33850): Rename KillAllSessionsByPattern and
// ScopedKillAllSessionsByPatternImpersonator to not refer to session kill
- const KillAllSessionsByPattern* pattern = matcher.match(session->getSessionId());
+ const KillAllSessionsByPattern* pattern = matcher.match(session.getSessionId());
invariant(pattern);
ScopedKillAllSessionsByPatternImpersonator impersonator(opCtx, *pattern);
- killSessionFn(session.get());
+ killSessionFn(opCtx, session);
}
}
@@ -86,12 +87,13 @@ void killSessionsAction(OperationContext* opCtx,
void killSessionsLocalKillTransactions(OperationContext* opCtx,
const SessionKiller::Matcher& matcher,
ErrorCodes::Error reason) {
- killSessionsAction(
- opCtx,
- matcher,
- [](Session*) { return true; },
- [](Session* session) { TransactionParticipant::get(session)->abortArbitraryTransaction(); },
- reason);
+ killSessionsAction(opCtx,
+ matcher,
+ [](const ObservableSession&) { return true; },
+ [](OperationContext* opCtx, const SessionToKill& session) {
+ TransactionParticipant::get(session.get())->abortArbitraryTransaction();
+ },
+ reason);
}
SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
@@ -112,21 +114,21 @@ void killAllExpiredTransactions(OperationContext* opCtx) {
killSessionsAction(
opCtx,
matcherAllSessions,
- [](Session* session) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ [when = opCtx->getServiceContext()->getPreciseClockSource()->now()](
+ const ObservableSession& session) {
- return txnParticipant->expired();
+ return TransactionParticipant::get(session.get())->expired();
},
- [](Session* session) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ [](OperationContext* opCtx, const SessionToKill& session) {
+ auto txnParticipant = TransactionParticipant::get(session.get());
LOG(0)
<< "Aborting transaction with txnNumber " << txnParticipant->getActiveTxnNumber()
- << " on session " << session->getSessionId().getId()
+ << " on session " << session.getSessionId().getId()
<< " because it has been running for longer than 'transactionLifetimeLimitSeconds'";
- // The try/catch block below is necessary because expired() in the filterFn above could
- // return true for expired, but unprepared transaction, but by the time we get to
+ // The try/catch block below is necessary because expiredAsOf() in the filterFn above
+ // could return true for expired, but unprepared transaction, but by the time we get to
// actually kill it, the participant could theoretically become prepared (being under
// the SessionCatalog mutex doesn't prevent the concurrently running thread from doing
// preparing the participant).
@@ -136,8 +138,9 @@ void killAllExpiredTransactions(OperationContext* opCtx) {
try {
txnParticipant->abortArbitraryTransaction();
} catch (const DBException& ex) {
+ // TODO(schwerin): Can we catch a more specific exception?
warning() << "May have failed to abort expired transaction on session "
- << session->getSessionId().getId() << " due to " << redact(ex.toStatus());
+ << session.getSessionId().getId() << " due to " << redact(ex.toStatus());
}
},
ErrorCodes::ExceededTimeLimit);
@@ -148,8 +151,10 @@ void killSessionsLocalShutdownAllTransactions(OperationContext* opCtx) {
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
killSessionsAction(opCtx,
matcherAllSessions,
- [](Session*) { return true; },
- [](Session* session) { TransactionParticipant::get(session)->shutdown(); },
+ [](const ObservableSession&) { return true; },
+ [](OperationContext* opCtx, const SessionToKill& session) {
+ TransactionParticipant::get(session.get())->shutdown();
+ },
ErrorCodes::InterruptedAtShutdown);
}
@@ -159,15 +164,14 @@ void killSessionsAbortAllPreparedTransactions(OperationContext* opCtx) {
killSessionsAction(
opCtx,
matcherAllSessions,
- [](Session* session) {
+ [](const ObservableSession& session) {
// Filter for sessions that have a prepared transaction.
- const auto txnParticipant = TransactionParticipant::get(session);
- return txnParticipant->transactionIsPrepared();
+ return TransactionParticipant::get(session.get())->transactionIsPrepared();
},
- [](Session* session) {
+ [](OperationContext* opCtx, const SessionToKill& session) {
// Abort the prepared transaction and invalidate the session it is
// associated with.
- TransactionParticipant::get(session)->abortPreparedTransactionForRollback();
+ TransactionParticipant::get(session.get())->abortPreparedTransactionForRollback();
});
}
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index ea3b80b605e..c8bd4509843 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -334,7 +334,6 @@ TEST_F(OpObserverSessionCatalogRollbackTest,
OnRollbackDoesntInvalidateSessionCatalogIfNoSessionOpsRolledBack) {
const NamespaceString nss("testDB", "testColl");
- auto sessionCatalog = SessionCatalog::get(getServiceContext());
auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 0;
@@ -343,10 +342,8 @@ TEST_F(OpObserverSessionCatalogRollbackTest,
{
auto opCtx = cc().makeOperationContext();
opCtx->setLogicalSessionId(sessionId);
-
- // Create a session and sync it from disk
- auto session = sessionCatalog->checkOutSession(opCtx.get());
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ MongoDOperationContextSession ocs(opCtx.get());
+ const auto txnParticipant = TransactionParticipant::get(opCtx.get());
txnParticipant->refreshFromStorageIfNeeded();
// Simulate a write occurring on that session
@@ -369,9 +366,8 @@ TEST_F(OpObserverSessionCatalogRollbackTest,
{
auto opCtx = cc().makeOperationContext();
opCtx->setLogicalSessionId(sessionId);
-
- auto session = sessionCatalog->checkOutSession(opCtx.get());
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ MongoDOperationContextSession ocs(opCtx.get());
+ const auto txnParticipant = TransactionParticipant::get(opCtx.get());
ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(stmtId));
}
}
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 3ee21d3001b..aeef3174c5c 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -556,14 +556,14 @@ void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext
? makeSessionFilterForAuthenticatedUsers(opCtx)
: KillAllSessionsByPatternSet{{}});
- sessionCatalog->scanSessions({std::move(sessionFilter)},
- [&](WithLock sessionCatalogLock, Session* session) {
- auto op =
- TransactionParticipant::get(session)->reportStashedState();
- if (!op.isEmpty()) {
- ops->emplace_back(op);
- }
- });
+ sessionCatalog->scanSessions(
+ {std::move(sessionFilter)},
+ [&](const ObservableSession& session) {
+ auto op = TransactionParticipant::get(session.get())->reportStashedState();
+ if (!op.isEmpty()) {
+ ops->emplace_back(op);
+ }
+ });
}
std::unique_ptr<CollatorInterface> MongoInterfaceStandalone::_getCollectionDefaultCollator(
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 0cbe33a8d74..28437321ef5 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -42,7 +42,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/migration_session_id.h"
-#include "mongo/db/session_catalog.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/client/shard_registry.h"
@@ -204,8 +204,7 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
* Insert a new oplog entry by converting the oplogBSON into type 'n' oplog with the session
* information. The new oplogEntry will also link to prePostImageTs if not null.
*/
-ProcessOplogResult processSessionOplog(OperationContext* opCtx,
- const BSONObj& oplogBSON,
+ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
const ProcessOplogResult& lastResult) {
auto oplogEntry = parseOplog(oplogBSON);
const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
@@ -247,9 +246,12 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const auto stmtId = *oplogEntry.getStatementId();
- auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, result.sessionId);
- auto const txnParticipant = TransactionParticipant::get(scopedSession.get());
- txnParticipant->refreshFromStorageIfNeeded();
+ auto uniqueOpCtx = cc().makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
+ opCtx->setLogicalSessionId(result.sessionId);
+ opCtx->setTxnNumber(result.txnNum);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant->beginOrContinue(result.txnNum, boost::none, boost::none);
try {
@@ -396,10 +398,7 @@ void SessionCatalogMigrationDestination::join() {
*/
void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(ServiceContext* service) {
Client::initThread(
- "sessionCatalogMigration-" + _migrationSessionId.toString(), service, nullptr);
-
- auto uniqueCtx = cc().makeOperationContext();
- auto opCtx = uniqueCtx.get();
+ "sessionCatalogMigrationProducer-" + _migrationSessionId.toString(), service, nullptr);
bool oplogDrainedAfterCommiting = false;
ProcessOplogResult lastResult;
@@ -413,67 +412,75 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
}
- auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
- BSONArray oplogArray(nextBatch[kOplogField].Obj());
- BSONArrayIteratorSorted oplogIter(oplogArray);
-
- if (!oplogIter.more()) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_state == State::Committing) {
- // The migration is considered done only when it gets an empty result from the
- // source shard while this is in state committing. This is to make sure that it
- // doesn't miss any new oplog created between the time window where this
- // depleted the buffer from the source shard and receiving the commit command.
- if (oplogDrainedAfterCommiting) {
- break;
+ BSONObj nextBatch;
+ BSONArray oplogArray;
+ {
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+
+ nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
+ oplogArray = BSONArray{nextBatch[kOplogField].Obj()};
+
+ if (oplogArray.isEmpty()) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_state == State::Committing) {
+ // The migration is considered done only when it gets an empty result from
+ // the source shard while this is in state committing. This is to make sure
+ // that it doesn't miss any new oplog created between the time window where
+ // this depleted the buffer from the source shard and receiving the commit
+ // command.
+ if (oplogDrainedAfterCommiting) {
+ break;
+ }
+
+ oplogDrainedAfterCommiting = true;
}
+ }
- oplogDrainedAfterCommiting = true;
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(
+ waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
+
+ // We depleted the buffer at least once, transition to ready for commit.
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Note: only transition to "ready to commit" if state is not error/force stop.
+ if (_state == State::Migrating) {
+ _state = State::ReadyToCommit;
+ _isStateChanged.notify_all();
+ }
}
- }
- WriteConcernResult unusedWCResult;
- uassertStatusOK(
- waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
-
- // We depleted the buffer at least once, transition to ready for commit.
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Note: only transition to "ready to commit" if state is not error/force stop.
- if (_state == State::Migrating) {
- _state = State::ReadyToCommit;
- _isStateChanged.notify_all();
+ if (lastOpTimeWaited == lastResult.oplogTime) {
+ // We got an empty result at least twice in a row from the source shard so space
+ // it
+ // out a little bit so we don't hammer the shard
+ opCtx->sleepFor(Milliseconds(200));
}
- }
- if (lastOpTimeWaited == lastResult.oplogTime) {
- // We got an empty result at least twice in a row from the source shard so space it
- // out a little bit so we don't hammer the shard
- opCtx->sleepFor(Milliseconds(200));
+ lastOpTimeWaited = lastResult.oplogTime;
}
-
- lastOpTimeWaited = lastResult.oplogTime;
}
-
- while (oplogIter.more()) {
+ for (BSONArrayIteratorSorted oplogIter(oplogArray); oplogIter.more();) {
try {
- lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult);
- } catch (const DBException& ex) {
- if (ex.code() == ErrorCodes::ConflictingOperationInProgress ||
- ex.code() == ErrorCodes::TransactionTooOld) {
- // This means that the server has a newer txnNumber than the oplog being
- // migrated, so just skip it
- continue;
- }
-
- throw;
+ lastResult = processSessionOplog(oplogIter.next().Obj(), lastResult);
+ } catch (const ExceptionFor<ErrorCodes::ConfigurationInProgress>&) {
+ // This means that the server has a newer txnNumber than the oplog being
+ // migrated, so just skip it
+ continue;
+ } catch (const ExceptionFor<ErrorCodes::TransactionTooOld>&) {
+ // This means that the server has a newer txnNumber than the oplog being
+ // migrated, so just skip it
+ continue;
}
}
}
WriteConcernResult unusedWCResult;
- uassertStatusOK(waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
+ auto uniqueOpCtx = cc().makeOperationContext();
+ uassertStatusOK(
+ waitForWriteConcern(uniqueOpCtx.get(), lastResult.oplogTime, kMajorityWC, &unusedWCResult));
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 451ef2425cd..5cb0aa92c62 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -164,13 +164,14 @@ public:
return _migrationId.value();
}
- ScopedCheckedOutSession getSessionWithTxn(OperationContext* opCtx,
- const LogicalSessionId& sessionId,
- const TxnNumber& txnNum) {
- auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, sessionId);
- const auto txnParticipant = TransactionParticipant::get(scopedSession.get());
+ void setUpSessionWithTxn(OperationContext* opCtx,
+ const LogicalSessionId& sessionId,
+ const TxnNumber& txnNum) {
+ opCtx->setLogicalSessionId(sessionId);
+ opCtx->setTxnNumber(txnNum);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
- return scopedSession;
}
void checkOplog(const repl::OplogEntry& originalOplog, const repl::OplogEntry& oplogToCheck) {
@@ -194,21 +195,17 @@ public:
}
}
- void checkStatementExecuted(OperationContext* opCtx,
- Session* session,
- TxnNumber txnNumber,
- StmtId stmtId) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ void checkStatementExecuted(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
auto oplog = txnParticipant->checkStatementExecuted(stmtId);
ASSERT_TRUE(oplog);
}
void checkStatementExecuted(OperationContext* opCtx,
- Session* session,
TxnNumber txnNumber,
StmtId stmtId,
const repl::OplogEntry& expectedOplog) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
auto oplog = txnParticipant->checkStatementExecuted(stmtId);
ASSERT_TRUE(oplog);
checkOplogWithNestedOplog(expectedOplog, *oplog);
@@ -354,8 +351,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -369,9 +368,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
- checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
- checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
+ checkStatementExecuted(opCtx, 2, 23, oplog1);
+ checkStatementExecuted(opCtx, 2, 45, oplog2);
+ checkStatementExecuted(opCtx, 2, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn) {
@@ -416,15 +415,17 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, txnNum);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, txnNum);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), txnNum, 5, oplog3);
+ checkStatementExecuted(opCtx, txnNum, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparateBatches) {
@@ -468,8 +469,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -483,9 +486,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
- checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
- checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
+ checkStatementExecuted(opCtx, 2, 23, oplog1);
+ checkStatementExecuted(opCtx, 2, 45, oplog2);
+ checkStatementExecuted(opCtx, 2, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) {
@@ -536,32 +539,40 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
auto opCtx = operationContext();
{
- auto session = getSessionWithTxn(opCtx, sessionId1, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId1, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
+ checkStatementExecuted(opCtx, 2, 23, oplog1);
}
{
- auto session = getSessionWithTxn(opCtx, sessionId2, 42);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ // XXX TODO USE A DIFFERENT OPERATION CONTEXT!
+ auto client2 = getServiceContext()->makeClient("client2");
+ AlternativeClientRegion acr(client2);
+ auto opCtx2 = cc().makeOperationContext();
+ setUpSessionWithTxn(opCtx2.get(), sessionId2, 42);
+ MongoDOperationContextSession ocs(opCtx2.get());
+ auto txnParticipant = TransactionParticipant::get(opCtx2.get());
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
- checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx2.get()));
ASSERT_TRUE(historyIter.hasNext());
- checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
+ checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx2.get()));
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 42, 45, oplog2);
- checkStatementExecuted(opCtx, session.get(), 42, 5, oplog3);
+ checkStatementExecuted(opCtx2.get(), 42, 45, oplog2);
+ checkStatementExecuted(opCtx2.get(), 42, 5, oplog3);
}
}
@@ -612,8 +623,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
@@ -625,8 +638,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23);
- checkStatementExecuted(opCtx, session.get(), 2, 45);
+ checkStatementExecuted(opCtx, 2, 23);
+ checkStatementExecuted(opCtx, 2, 45);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindAndModify) {
@@ -663,8 +676,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -716,7 +731,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
ASSERT_TRUE(newPreImageOplog.getObject2());
ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
- checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
+ checkStatementExecuted(opCtx, 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFindAndModify) {
@@ -752,8 +767,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -805,7 +822,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
ASSERT_TRUE(newPostImageOplog.getObject2());
ASSERT_TRUE(newPostImageOplog.getObject2().value().isEmpty());
- checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
+ checkStatementExecuted(opCtx, 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModifySplitIn2Batches) {
@@ -844,8 +861,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -897,7 +916,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
ASSERT_TRUE(newPreImageOplog.getObject2());
ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
- checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
+ checkStatementExecuted(opCtx, 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
@@ -944,8 +963,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
- auto session = getSessionWithTxn(opCtx, sessionId, 20);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 20);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -956,7 +977,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 20, 0);
+ checkStatementExecuted(opCtx, 20, 0);
}
TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwrittenByOldMigrateTxn) {
@@ -1006,8 +1027,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
finishSessionExpectSuccess(&sessionMigration);
- auto session = getSessionWithTxn(opCtx, sessionId, 20);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 20);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1016,7 +1039,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
<< "newerSess"),
oplog.getObject());
- checkStatementExecuted(opCtx, session.get(), 20, 0);
+ checkStatementExecuted(opCtx, 20, 0);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) {
@@ -1186,8 +1209,10 @@ TEST_F(SessionCatalogMigrationDestinationTest,
finishSessionExpectSuccess(&sessionMigration);
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1203,9 +1228,9 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 0);
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
- checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
+ checkStatementExecuted(opCtx, 2, 0);
+ checkStatementExecuted(opCtx, 2, 23, oplog1);
+ checkStatementExecuted(opCtx, 2, 45, oplog2);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImageOplog) {
@@ -1490,8 +1515,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
finishSessionExpectSuccess(&sessionMigration);
- auto session = getSessionWithTxn(opCtx, sessionId, 19);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 19);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1508,9 +1535,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
ASSERT_TRUE(firstInsertOplog.getStatementId());
ASSERT_EQ(30, *firstInsertOplog.getStatementId());
- checkStatementExecuted(opCtx, session.get(), 19, 23, oplog1);
- checkStatementExecuted(opCtx, session.get(), 19, 30);
- checkStatementExecuted(opCtx, session.get(), 19, 45, oplog3);
+ checkStatementExecuted(opCtx, 19, 23, oplog1);
+ checkStatementExecuted(opCtx, 19, 30);
+ checkStatementExecuted(opCtx, 19, 45, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory) {
@@ -1555,8 +1582,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, *sessionInfo.getSessionId(), 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, *sessionInfo.getSessionId(), 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1570,8 +1599,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplogEntries[0]);
- checkStatementExecuted(opCtx, session.get(), 2, 5, oplogEntries[2]);
+ checkStatementExecuted(opCtx, 2, 23, oplogEntries[0]);
+ checkStatementExecuted(opCtx, 2, 5, oplogEntries[2]);
ASSERT_THROWS(txnParticipant->checkStatementExecuted(38), AssertionException);
}
@@ -1586,9 +1615,10 @@ TEST_F(SessionCatalogMigrationDestinationTest,
// "Start" a new transaction on session 1, so that migrating the entries above will result
// in TransactionTooOld. This should not preclude the entries for session 2 from getting
// applied.
- auto scopedSession =
- SessionCatalog::get(opCtx)->checkOutSession(opCtx, *sessionInfo1.getSessionId());
- const auto txnParticipant = TransactionParticipant::get(scopedSession.get());
+ setUpSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), *sessionInfo1.getTxnNumber());
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
txnParticipant->refreshFromStorageIfNeeded();
txnParticipant->beginOrContinue(3, boost::none, boost::none);
}
@@ -1639,15 +1669,25 @@ TEST_F(SessionCatalogMigrationDestinationTest,
// Check nothing was written for session 1
{
- auto session1 = getSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), 3);
- const auto txnParticipant1 = TransactionParticipant::get(session1.get());
+ auto c1 = getServiceContext()->makeClient("c1");
+ AlternativeClientRegion acr(c1);
+ auto opCtx1 = cc().makeOperationContext();
+ auto opCtx = opCtx1.get();
+ setUpSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), 3);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant1 = TransactionParticipant::get(opCtx);
ASSERT(txnParticipant1->getLastWriteOpTime().isNull());
}
// Check session 2 was correctly updated
{
- auto session2 = getSessionWithTxn(opCtx, *sessionInfo2.getSessionId(), 15);
- const auto txnParticipant2 = TransactionParticipant::get(session2.get());
+ auto c2 = getServiceContext()->makeClient("c2");
+ AlternativeClientRegion acr(c2);
+ auto opCtx2 = cc().makeOperationContext();
+ auto opCtx = opCtx2.get();
+ setUpSessionWithTxn(opCtx, *sessionInfo2.getSessionId(), 15);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant2 = TransactionParticipant::get(opCtx);
TransactionHistoryIterator historyIter(txnParticipant2->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1658,8 +1698,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session2.get(), 15, 56, oplogEntries[1]);
- checkStatementExecuted(opCtx, session2.get(), 15, 55, oplogEntries[2]);
+ checkStatementExecuted(opCtx, 15, 56, oplogEntries[1]);
+ checkStatementExecuted(opCtx, 15, 55, oplogEntries[2]);
}
}
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
deleted file mode 100644
index f72457d7f94..00000000000
--- a/src/mongo/db/session.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/session.h"
-
-#include "mongo/db/operation_context.h"
-
-namespace mongo {
-
-Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
-
-OperationContext* Session::currentOperation() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _checkoutOpCtx;
-}
-
-Session::KillToken Session::kill(WithLock sessionCatalogLock, ErrorCodes::Error reason) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- const bool firstKiller = (0 == _killsRequested);
- ++_killsRequested;
-
- // For currently checked-out sessions, interrupt the operation context so that the current owner
- // can release the session
- if (firstKiller && _checkoutOpCtx) {
- const auto serviceContext = _checkoutOpCtx->getServiceContext();
-
- stdx::lock_guard<Client> clientLock(*_checkoutOpCtx->getClient());
- serviceContext->killOperation(_checkoutOpCtx, reason);
- }
-
- return KillToken(getSessionId());
-}
-
-bool Session::killed() const {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- return _killsRequested > 0;
-}
-
-void Session::_markCheckedOut(WithLock sessionCatalogLock, OperationContext* checkoutOpCtx) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_checkoutOpCtx);
- _checkoutOpCtx = checkoutOpCtx;
-}
-
-void Session::_markCheckedIn(WithLock sessionCatalogLock) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_checkoutOpCtx);
- _checkoutOpCtx = nullptr;
-}
-
-void Session::_markNotKilled(WithLock sessionCatalogLock, KillToken killToken) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- invariant(_killsRequested > 0);
- --_killsRequested;
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index ba01d6458f5..ad5a9dfb82b 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -32,13 +32,12 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/operation_context.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/decorable.h"
namespace mongo {
-class OperationContext;
-
/**
* A decorable container for state associated with an active session running on a MongoD or MongoS
* server. Refer to SessionCatalog for more information on the semantics of sessions.
@@ -46,10 +45,11 @@ class OperationContext;
class Session : public Decorable<Session> {
MONGO_DISALLOW_COPYING(Session);
+ friend class ObservableSession;
friend class SessionCatalog;
public:
- explicit Session(LogicalSessionId sessionId);
+ explicit Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
/**
* The logical session id that this object represents.
@@ -58,72 +58,23 @@ public:
return _sessionId;
}
- /**
- * Returns a pointer to the current operation running on this Session, or nullptr if there is no
- * operation currently running on this Session.
- */
- OperationContext* currentOperation() const;
-
- /**
- * Increments the number of "killers" for this session and returns a 'kill token' to to be
- * passed later on to 'checkOutSessionForKill' method of the SessionCatalog in order to permit
- * the caller to execute any kill cleanup tasks. This token is later on passed to
- * '_markNotKilled' in order to decrement the number of "killers".
- *
- * Marking session as killed is an internal property only that will cause any further calls to
- * 'checkOutSession' to block until 'checkOutSessionForKill' is called the same number of times
- * as 'kill' was called and the returned scoped object destroyed.
- *
- * If the first killer finds the session checked-out, this method will also interrupt the
- * operation context which has it checked-out.
- *
- * Must be called under the owning SessionCatalog's lock.
- */
- struct KillToken {
- KillToken(LogicalSessionId lsid) : lsidToKill(std::move(lsid)) {}
- KillToken(KillToken&&) = default;
- KillToken& operator=(KillToken&&) = default;
-
- LogicalSessionId lsidToKill;
- };
- KillToken kill(WithLock sessionCatalogLock, ErrorCodes::Error reason = ErrorCodes::Interrupted);
-
- /**
- * Returns whether 'kill' has been called on this session.
- */
- bool killed() const;
+ OperationContext* currentOperation_forTest() const {
+ return _checkoutOpCtx;
+ }
private:
- /**
- * Set/clear the current check-out state of the session by storing the operation which has this
- * session currently checked-out.
- *
- * Must be called under the SessionCatalog mutex and internally will acquire the Session mutex.
- */
- void _markCheckedOut(WithLock sessionCatalogLock, OperationContext* checkoutOpCtx);
- void _markCheckedIn(WithLock sessionCatalogLock);
-
- /**
- * Used by the session catalog when checking a session back in after a call to 'kill'. See the
- * comments for 'kill for more details.
- */
- void _markNotKilled(WithLock sessionCatalogLock, KillToken killToken);
-
// The id of the session with which this object is associated
const LogicalSessionId _sessionId;
- // Protects the member variables below. The order of lock acquisition should always be:
- //
- // 1) SessionCatalog mutex (if applicable)
- // 2) Session mutex
- // 3) Any decoration mutexes and/or the currently running Client's lock
- mutable stdx::mutex _mutex;
-
// A pointer back to the currently running operation on this Session, or nullptr if there
// is no operation currently running for the Session.
+ //
+ // May be read by holders of the SessionCatalog mutex. May only be set when clear or cleared
+ // when set, and the opCtx being set or cleared must have its client locked at the time.
OperationContext* _checkoutOpCtx{nullptr};
- // Incremented every time 'kill' is invoked and decremented by '_markNotKilled'.
+ // Counter indicating the number of times ObservableSession::kill has been called on this
+ // session, which have not yet had a corresponding call to checkOutSessionForKill.
int _killsRequested{0};
};
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index 4da7207b735..34bd12390e3 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -45,16 +45,16 @@ namespace {
const auto sessionTransactionTableDecoration = ServiceContext::declareDecoration<SessionCatalog>();
const auto operationSessionDecoration =
- OperationContext::declareDecoration<boost::optional<ScopedCheckedOutSession>>();
+ OperationContext::declareDecoration<boost::optional<SessionCatalog::ScopedCheckedOutSession>>();
} // namespace
SessionCatalog::~SessionCatalog() {
stdx::lock_guard<stdx::mutex> lg(_mutex);
for (const auto& entry : _sessions) {
- auto& sri = entry.second;
- invariant(!sri->session.currentOperation());
- invariant(!sri->session.killed());
+ ObservableSession session(lg, entry.second->session);
+ invariant(!session.currentOperation());
+ invariant(!session._killed());
}
}
@@ -72,56 +72,55 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) {
return &sessionTransactionTable;
}
-ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) {
- invariant(opCtx->getLogicalSessionId());
- return checkOutSession(opCtx, *opCtx->getLogicalSessionId());
-}
-
-ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx,
- const LogicalSessionId& lsid) {
+SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(OperationContext* opCtx) {
// This method is not supposed to be called with an already checked-out session due to risk of
// deadlock
+ invariant(opCtx->getLogicalSessionId());
invariant(!operationSessionDecoration(opCtx));
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
invariant(!opCtx->lockState()->isLocked());
stdx::unique_lock<stdx::mutex> ul(_mutex);
- auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid);
+ auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, *opCtx->getLogicalSessionId());
// Wait until the session is no longer checked out and until the previously scheduled kill has
// completed
- opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&sri]() {
- return !sri->session.currentOperation() && !sri->session.killed();
+ opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri]() {
+ ObservableSession osession(ul, sri->session);
+ return !osession.currentOperation() && !osession._killed();
});
- sri->session._markCheckedOut(ul, opCtx);
+ {
+ stdx::lock_guard<Client> lockClient(*opCtx->getClient());
+ sri->session._checkoutOpCtx = opCtx;
+ }
return ScopedCheckedOutSession(
*this, std::move(sri), boost::none /* Not checked out for kill */);
}
-ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* opCtx,
- Session::KillToken killToken) {
+SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationContext* opCtx,
+ KillToken killToken) {
// This method is not supposed to be called with an already checked-out session due to risk of
// deadlock
invariant(!operationSessionDecoration(opCtx));
invariant(!opCtx->getTxnNumber());
- const auto lsid = killToken.lsidToKill;
-
stdx::unique_lock<stdx::mutex> ul(_mutex);
- auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid);
- invariant(sri->session.killed());
+ auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, killToken.lsidToKill);
+ invariant(ObservableSession(ul, sri->session)._killed());
// Wait until the session is no longer checked out
- opCtx->waitForConditionOrInterrupt(
- sri->availableCondVar, ul, [&sri]() { return !sri->session.currentOperation(); });
+ opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri]() {
+ return !ObservableSession(ul, sri->session).currentOperation();
+ });
- invariant(!sri->session.currentOperation());
- sri->session._markCheckedOut(ul, opCtx);
+ {
+ stdx::lock_guard<Client> lockClient(*opCtx->getClient());
+ sri->session._checkoutOpCtx = opCtx;
+ }
- return ScopedCheckedOutSession(
- *this, std::move(sri), std::move(killToken) /* Checked out for kill */);
+ return SessionToKill(ScopedCheckedOutSession(*this, std::move(sri), std::move(killToken)));
}
void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
@@ -132,18 +131,18 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
for (auto& sessionEntry : _sessions) {
if (matcher.match(sessionEntry.first)) {
- workerFn(lg, &sessionEntry.second->session);
+ workerFn({lg, sessionEntry.second->session});
}
}
}
-Session::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) {
+SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
auto it = _sessions.find(lsid);
uassert(ErrorCodes::NoSuchSession, "Session not found", it != _sessions.end());
auto& sri = it->second;
- return sri->session.kill(lg);
+ return ObservableSession(lg, sri->session).kill();
}
std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreateSessionRuntimeInfo(
@@ -157,21 +156,46 @@ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreate
}
void SessionCatalog::_releaseSession(std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri,
- boost::optional<Session::KillToken> killToken) {
+ boost::optional<SessionCatalog::KillToken> killToken) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
// Make sure we have exactly the same session on the map and that it is still associated with an
// operation context (meaning checked-out)
invariant(_sessions[sri->session.getSessionId()] == sri);
- invariant(sri->session.currentOperation());
-
- sri->session._markCheckedIn(lg);
+ invariant(sri->session._checkoutOpCtx);
+ {
+ stdx::lock_guard<Client> lockClient(*sri->session._checkoutOpCtx->getClient());
+ sri->session._checkoutOpCtx = nullptr;
+ }
sri->availableCondVar.notify_all();
if (killToken) {
- invariant(sri->session.killed());
- sri->session._markNotKilled(lg, std::move(*killToken));
+ invariant(sri->session._killsRequested > 0);
+ --sri->session._killsRequested;
+ }
+}
+
+OperationContext* ObservableSession::currentOperation() const {
+ return _session->_checkoutOpCtx;
+}
+
+SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) const {
+ const bool firstKiller = (0 == _session->_killsRequested);
+ ++_session->_killsRequested;
+
+ // For currently checked-out sessions, interrupt the operation context so that the current owner
+ // can release the session
+ if (firstKiller && _session->_checkoutOpCtx) {
+ invariant(_clientLock);
+ const auto serviceContext = _session->_checkoutOpCtx->getServiceContext();
+ serviceContext->killOperation(_session->_checkoutOpCtx, reason);
}
+
+ return SessionCatalog::KillToken(getSessionId());
+}
+
+bool ObservableSession::_killed() const {
+ return _session->_killsRequested > 0;
}
OperationContextSession::OperationContextSession(OperationContext* opCtx) : _opCtx(opCtx) {
@@ -217,7 +241,8 @@ void OperationContextSession::checkIn(OperationContext* opCtx) {
// but destruction of the checkedOutSession must not be, as it takes the SessionCatalog mutex,
// and other code may take the Client lock while holding that mutex.
stdx::unique_lock<Client> lk(*opCtx->getClient());
- ScopedCheckedOutSession sessionToReleaseOutOfLock(std::move(*checkedOutSession));
+ SessionCatalog::ScopedCheckedOutSession sessionToReleaseOutOfLock(
+ std::move(*checkedOutSession));
// This destroys the moved-from ScopedCheckedOutSession, and must be done within the client lock
checkedOutSession = boost::none;
@@ -229,7 +254,7 @@ void OperationContextSession::checkOut(OperationContext* opCtx) {
invariant(!checkedOutSession);
const auto catalog = SessionCatalog::get(opCtx);
- auto scopedCheckedOutSession = catalog->checkOutSession(opCtx);
+ auto scopedCheckedOutSession = catalog->_checkOutSession(opCtx);
// We acquire a Client lock here to guard the construction of this session so that references to
// this session are safe to use while the lock is held
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 1ead5cad736..c22c96ff8ba 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -34,7 +34,9 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/client.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/session.h"
#include "mongo/db/session_killer.h"
#include "mongo/stdx/condition_variable.h"
@@ -44,10 +46,7 @@
namespace mongo {
-class OperationContext;
-class ScopedSession;
-class ScopedCheckedOutSession;
-class ServiceContext;
+class ObservableSession;
/**
* Keeps track of the transaction runtime state for every active session on this instance.
@@ -55,10 +54,21 @@ class ServiceContext;
class SessionCatalog {
MONGO_DISALLOW_COPYING(SessionCatalog);
- friend class ScopedSession;
- friend class ScopedCheckedOutSession;
+ friend class ObservableSession;
+ friend class OperationContextSession;
public:
+ class ScopedCheckedOutSession;
+ class SessionToKill;
+
+ struct KillToken {
+ KillToken(LogicalSessionId lsid) : lsidToKill(std::move(lsid)) {}
+ KillToken(KillToken&&) = default;
+ KillToken& operator=(KillToken&&) = default;
+
+ LogicalSessionId lsidToKill;
+ };
+
SessionCatalog() = default;
~SessionCatalog();
@@ -75,28 +85,10 @@ public:
void reset_forTest();
/**
- * Potentially blocking call, which either creates a brand new session object (if one doesn't
- * exist) or "checks-out" the existing one (if it is not currently in use or marked for kill).
- *
- * The 'opCtx'-only variant uses the session information stored on the operation context and the
- * variant, which has the 'lsid' parameter checks-out that session id. Neither of these methods
- * can be called with an already checked-out session.
- *
- * Checking out a session puts it in the 'checked out' state and all subsequent calls to
- * checkout will block until it is checked back in. This happens when the returned object goes
- * out of scope.
- *
- * Throws exception on errors.
- */
- ScopedCheckedOutSession checkOutSession(OperationContext* opCtx);
- ScopedCheckedOutSession checkOutSession(OperationContext* opCtx, const LogicalSessionId& lsid);
-
- /**
- * See the description of 'Session::kill' for more information on the session kill usage
- * pattern.
+ * See the description of 'ObservableSession::kill' for more information on the session kill
+ * usage pattern.
*/
- ScopedCheckedOutSession checkOutSessionForKill(OperationContext* opCtx,
- Session::KillToken killToken);
+ SessionToKill checkOutSessionForKill(OperationContext* opCtx, KillToken killToken);
/**
* Iterates through the SessionCatalog under the SessionCatalog mutex and applies 'workerFn' to
@@ -109,7 +101,7 @@ public:
*
* TODO SERVER-33850: Take Matcher out of the SessionKiller namespace.
*/
- using ScanSessionsCallbackFn = stdx::function<void(WithLock, Session*)>;
+ using ScanSessionsCallbackFn = stdx::function<void(const ObservableSession&)>;
void scanSessions(const SessionKiller::Matcher& matcher,
const ScanSessionsCallbackFn& workerFn);
@@ -117,7 +109,7 @@ public:
* Shortcut to invoke 'kill' on the specified session under the SessionCatalog mutex. Throws a
* NoSuchSession exception if the session doesn't exist.
*/
- Session::KillToken killSession(const LogicalSessionId& lsid);
+ KillToken killSession(const LogicalSessionId& lsid);
private:
struct SessionRuntimeInfo {
@@ -132,6 +124,8 @@ private:
stdx::condition_variable availableCondVar;
};
+ ScopedCheckedOutSession _checkOutSession(OperationContext* opCtx);
+
/**
* May release and re-acquire it zero or more times before returning. The returned
* 'SessionRuntimeInfo' is guaranteed to be linked on the catalog's _txnTable as long as the
@@ -144,7 +138,7 @@ private:
* Makes a session, previously checked out through 'checkoutSession', available again.
*/
void _releaseSession(std::shared_ptr<SessionRuntimeInfo> sri,
- boost::optional<Session::KillToken> killToken);
+ boost::optional<KillToken> killToken);
stdx::mutex _mutex;
@@ -153,19 +147,20 @@ private:
};
/**
- * Scoped object representing a checked-out session. See comments for the 'checkoutSession' method
- * for more information on its behaviour.
+ * Scoped object representing a checked-out session. This type is an implementation detail
+ * of the SessionCatalog.
*/
-class ScopedCheckedOutSession {
- MONGO_DISALLOW_COPYING(ScopedCheckedOutSession);
-
- friend ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext*,
- const LogicalSessionId&);
- friend ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext*,
- Session::KillToken);
-
+class SessionCatalog::ScopedCheckedOutSession {
public:
+ ScopedCheckedOutSession(SessionCatalog& catalog,
+ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri,
+ boost::optional<SessionCatalog::KillToken> killToken)
+ : _catalog(catalog), _sri(std::move(sri)), _killToken(std::move(killToken)) {}
+
ScopedCheckedOutSession(ScopedCheckedOutSession&&) = default;
+ ScopedCheckedOutSession& operator=(ScopedCheckedOutSession&&) = delete;
+ ScopedCheckedOutSession(const ScopedCheckedOutSession&) = delete;
+ ScopedCheckedOutSession& operator=(ScopedCheckedOutSession&) = delete;
~ScopedCheckedOutSession() {
if (_sri) {
@@ -190,19 +185,117 @@ public:
}
private:
- ScopedCheckedOutSession(SessionCatalog& catalog,
- std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri,
- boost::optional<Session::KillToken> killToken)
- : _catalog(catalog), _sri(std::move(sri)), _killToken(std::move(killToken)) {}
-
// The owning session catalog into which the session should be checked back
SessionCatalog& _catalog;
std::shared_ptr<SessionCatalog::SessionRuntimeInfo> _sri;
+ boost::optional<SessionCatalog::KillToken> _killToken;
+};
+
+/**
+ * RAII type returned by SessionCatalog::checkOutSessionForKill.
+ *
+ * After calling kill() on an ObservableSession, let that ObservableSession go out
+ * of scope and in a context outside of SessionCatalog::scanSessions, call checkOutSessionForKill
+ * to get an instance of this type. Then, while holding that instance, perform any cleanup
+ * you need to perform on a session as part of killing it. More details in the description of
+ * ObservableSession::kill, below.
+ */
+class SessionCatalog::SessionToKill {
+public:
+ SessionToKill(ScopedCheckedOutSession&& scos) : _scos(std::move(scos)) {}
+ Session* get() const {
+ return _scos.get();
+ }
+
+ const LogicalSessionId& getSessionId() const {
+ return get()->getSessionId();
+ }
+ OperationContext* currentOperation_forTest() const {
+ return get()->currentOperation_forTest();
+ }
- // Only set if the session was obtained though checkOutSessionForKill
- boost::optional<Session::KillToken> _killToken;
+private:
+ ScopedCheckedOutSession _scos;
};
+using SessionToKill = SessionCatalog::SessionToKill;
+
+/**
+ * This type represents access to a session inside of a scanSessions loop.
+ * If you have one of these, you're in a scanSessions callback context, and so
+ * have locked the whole catalog and, if the observed session is bound to an operation context,
+ * you hold that operation context's client's mutex, as well.
+ */
+class ObservableSession {
+public:
+ ObservableSession(const ObservableSession&) = delete;
+ ObservableSession(ObservableSession&&) = delete;
+ ObservableSession& operator=(const ObservableSession&) = delete;
+ ObservableSession& operator=(ObservableSession&&) = delete;
+
+ /**
+ * The logical session id that this object represents.
+ */
+ const LogicalSessionId& getSessionId() const {
+ return _session->_sessionId;
+ }
+
+ /**
+ * Returns a pointer to the current operation running on this Session, or nullptr if there is no
+ * operation currently running on this Session.
+ */
+ OperationContext* currentOperation() const;
+
+ /**
+ * Increments the number of "killers" for this session and returns a 'kill token' to to be
+ * passed later on to 'checkOutSessionForKill' method of the SessionCatalog in order to permit
+ * the caller to execute any kill cleanup tasks. This token is later on passed to
+ * '_markNotKilled' in order to decrement the number of "killers".
+ *
+ * Marking session as killed is an internal property only that will cause any further calls to
+ * 'checkOutSession' to block until 'checkOutSessionForKill' is called the same number of times
+ * as 'kill' was called and the returned scoped object destroyed.
+ *
+ * If the first killer finds the session checked-out, this method will also interrupt the
+ * operation context which has it checked-out.
+ */
+ SessionCatalog::KillToken kill(ErrorCodes::Error reason = ErrorCodes::Interrupted) const;
+
+ /**
+ * Returns a pointer to the Session itself.
+ */
+ Session* get() const {
+ return _session;
+ }
+
+private:
+ friend class SessionCatalog;
+
+ static stdx::unique_lock<Client> _lockClientForSession(WithLock, Session* session) {
+ if (const auto opCtx = session->_checkoutOpCtx) {
+ return stdx::unique_lock<Client>{*opCtx->getClient()};
+ }
+ return {};
+ }
+
+ ObservableSession(WithLock wl, Session& session)
+ : _session(&session), _clientLock(_lockClientForSession(std::move(wl), _session)) {}
+
+ /**
+ * Returns whether 'kill' has been called on this session.
+ */
+ bool _killed() const;
+
+ /**
+ * Used by the session catalog when checking a session back in after a call to 'kill'. See the
+ * comments for 'kill for more details.
+ */
+ void _markNotKilled(WithLock sessionCatalogLock, SessionCatalog::KillToken killToken);
+
+ Session* _session;
+ stdx::unique_lock<Client> _clientLock;
+};
+
/**
* Scoped object, which checks out the session specified in the passed operation context and stores
@@ -213,11 +306,17 @@ class OperationContextSession {
MONGO_DISALLOW_COPYING(OperationContextSession);
public:
+ /**
+ * Acquires the session with id opCtx->getLogicalSessionId(). Because a session can only be
+ * checked out by one user at a time, construction of OperationContextSession can block waiting
+ * for the desired session to be checked in by another user.
+ */
OperationContextSession(OperationContext* opCtx);
~OperationContextSession();
/**
- * Returns the session checked out in the constructor.
+ * Returns the session currently checked out by "opCtx", or nullptr if the opCtx has no
+ * checked out session.
*/
static Session* get(OperationContext* opCtx);
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index c5fae50fe9c..dd6072af24e 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -74,8 +74,9 @@ auto getThreadPool(OperationContext* opCtx) {
return &sessionTasksExecutor(opCtx->getServiceContext()).threadPool;
}
-void killSessionTokensFunction(OperationContext* opCtx,
- std::shared_ptr<std::vector<Session::KillToken>> sessionKillTokens) {
+void killSessionTokensFunction(
+ OperationContext* opCtx,
+ std::shared_ptr<std::vector<SessionCatalog::KillToken>> sessionKillTokens) {
if (sessionKillTokens->empty())
return;
@@ -105,7 +106,7 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
const auto catalog = SessionCatalog::get(opCtx);
// The use of shared_ptr here is in order to work around the limitation of stdx::function that
// the functor must be copyable.
- auto sessionKillTokens = std::make_shared<std::vector<Session::KillToken>>();
+ auto sessionKillTokens = std::make_shared<std::vector<SessionCatalog::KillToken>>();
// Scan all sessions and reacquire locks for prepared transactions.
// There may be sessions that are checked out during this scan, but none of them
@@ -115,14 +116,14 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
SessionKiller::Matcher matcher(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
- catalog->scanSessions(matcher, [&](WithLock sessionCatalogLock, Session* session) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ catalog->scanSessions(matcher, [&](const ObservableSession& session) {
+ const auto txnParticipant = TransactionParticipant::get(session.get());
if (!txnParticipant->inMultiDocumentTransaction()) {
- sessionKillTokens->emplace_back(session->kill(sessionCatalogLock));
+ sessionKillTokens->emplace_back(session.kill());
}
if (txnParticipant->transactionIsPrepared()) {
- sessionIdToReacquireLocks.emplace_back(session->getSessionId());
+ sessionIdToReacquireLocks.emplace_back(session.getSessionId());
}
});
killSessionTokensFunction(opCtx, sessionKillTokens);
@@ -131,10 +132,12 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
// Create a new opCtx because we need an empty locker to refresh the locks.
auto newClient = opCtx->getServiceContext()->makeClient("restore-prepared-txn");
AlternativeClientRegion acr(newClient);
- auto newOpCtx = cc().makeOperationContext();
for (const auto& sessionId : sessionIdToReacquireLocks) {
- auto scopedSessionCheckOut = catalog->checkOutSession(newOpCtx.get(), sessionId);
- auto txnParticipant = TransactionParticipant::get(scopedSessionCheckOut.get());
+ auto newOpCtx = cc().makeOperationContext();
+ newOpCtx->setLogicalSessionId(sessionId);
+ MongoDOperationContextSession ocs(newOpCtx.get());
+ auto txnParticipant =
+ TransactionParticipant::get(OperationContextSession::get(newOpCtx.get()));
txnParticipant->refreshLocksForPreparedTransaction(newOpCtx.get(), false);
}
}
@@ -195,7 +198,7 @@ void MongoDSessionCatalog::invalidateSessions(OperationContext* opCtx,
// The use of shared_ptr here is in order to work around the limitation of stdx::function that
// the functor must be copyable.
- auto sessionKillTokens = std::make_shared<std::vector<Session::KillToken>>();
+ auto sessionKillTokens = std::make_shared<std::vector<SessionCatalog::KillToken>>();
if (singleSessionDoc) {
sessionKillTokens->emplace_back(catalog->killSession(LogicalSessionId::parse(
@@ -203,10 +206,9 @@ void MongoDSessionCatalog::invalidateSessions(OperationContext* opCtx,
} else {
SessionKiller::Matcher matcher(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
- catalog->scanSessions(
- matcher, [&sessionKillTokens](WithLock sessionCatalogLock, Session* session) {
- sessionKillTokens->emplace_back(session->kill(sessionCatalogLock));
- });
+ catalog->scanSessions(matcher, [&sessionKillTokens](const ObservableSession& session) {
+ sessionKillTokens->emplace_back(session.kill());
+ });
}
killSessionTokensFunction(opCtx, sessionKillTokens);
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index 9d9e532abad..b65a933d6c7 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -74,11 +74,11 @@ private:
TEST_F(SessionCatalogTestWithDefaultOpCtx, CheckoutAndReleaseSession) {
_opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ OperationContextSession ocs(_opCtx);
- auto scopedSession = catalog()->checkOutSession(_opCtx);
-
- ASSERT(scopedSession.get());
- ASSERT_EQ(*_opCtx->getLogicalSessionId(), scopedSession->getSessionId());
+ auto session = OperationContextSession::get(_opCtx);
+ ASSERT(session);
+ ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId());
}
TEST_F(SessionCatalogTestWithDefaultOpCtx, OperationContextCheckedOutSession) {
@@ -120,8 +120,8 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) {
TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) {
std::vector<LogicalSessionId> lsidsFound;
- const auto workerFn = [&lsidsFound](WithLock, Session* session) {
- lsidsFound.push_back(session->getSessionId());
+ const auto workerFn = [&lsidsFound](const ObservableSession& session) {
+ lsidsFound.push_back(session.getSessionId());
};
// Scan over zero Sessions.
@@ -139,7 +139,8 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) {
stdx::async(stdx::launch::async, [this, lsid] {
ThreadClient tc(getServiceContext());
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}).get();
}
@@ -163,7 +164,8 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
// Create the session so there is something to kill
{
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}
auto killToken = catalog()->killSession(lsid);
@@ -182,7 +184,8 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
auto future = stdx::async(stdx::launch::async, [this, lsid] {
ThreadClient tc(getServiceContext());
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ sideOpCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(sideOpCtx.get());
});
ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration()));
@@ -190,13 +193,14 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
}
// Make sure that session check-out after kill succeeds again
{
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}
// Make sure the "regular operation" eventually is able to proceed and use the just killed
@@ -225,8 +229,7 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
sideOpCtx->setLogicalSessionId(lsid);
sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired);
-
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ OperationContextSession ocs(sideOpCtx.get());
});
ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired);
@@ -248,7 +251,8 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
auto future = stdx::async(stdx::launch::async, [this, lsid] {
ThreadClient tc(getServiceContext());
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ sideOpCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(sideOpCtx.get());
});
ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration()));
@@ -256,13 +260,14 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
}
// Make sure that session check-out after kill succeeds again
{
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}
// Make sure the "regular operation" eventually is able to proceed and use the just killed
@@ -276,7 +281,8 @@ TEST_F(SessionCatalogTest, MarkSessionAsKilledCanBeCalledMoreThanOnce) {
// Create the session so there is something to kill
{
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}
auto killToken1 = catalog()->killSession(lsid);
@@ -292,13 +298,13 @@ TEST_F(SessionCatalogTest, MarkSessionAsKilledCanBeCalledMoreThanOnce) {
OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired);
}
- boost::optional<Session::KillToken> killTokenWhileSessionIsCheckedOutForKill;
+ boost::optional<SessionCatalog::KillToken> killTokenWhileSessionIsCheckedOutForKill;
// Finish the first killer of the session
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken1));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
// Killing a session while checked out for kill should not affect the killers
killTokenWhileSessionIsCheckedOutForKill.emplace(catalog()->killSession(lsid));
@@ -316,13 +322,13 @@ TEST_F(SessionCatalogTest, MarkSessionAsKilledCanBeCalledMoreThanOnce) {
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken2));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
}
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(
opCtx.get(), std::move(*killTokenWhileSessionIsCheckedOutForKill));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
}
}
@@ -379,7 +385,8 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
{
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ sideOpCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(sideOpCtx.get());
firstUseOfTheSessionReachedBarrier.countDownAndWait();
@@ -391,8 +398,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
{
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
sideOpCtx->setLogicalSessionId(lsid);
-
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ OperationContextSession ocs(sideOpCtx.get());
}
}));
}
@@ -402,14 +408,13 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
// Kill the first and the third sessions
{
- std::vector<Session::KillToken> firstAndThirdTokens;
+ std::vector<SessionCatalog::KillToken> firstAndThirdTokens;
catalog()->scanSessions(
SessionKiller::Matcher(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}),
- [&lsids, &firstAndThirdTokens](WithLock sessionCatalogLock, Session* session) {
- if (session->getSessionId() == lsids[0] || session->getSessionId() == lsids[2])
- firstAndThirdTokens.emplace_back(
- session->kill(sessionCatalogLock, ErrorCodes::ExceededTimeLimit));
+ [&lsids, &firstAndThirdTokens](const ObservableSession& session) {
+ if (session.getSessionId() == lsids[0] || session.getSessionId() == lsids[2])
+ firstAndThirdTokens.emplace_back(session.kill(ErrorCodes::ExceededTimeLimit));
});
ASSERT_EQ(2U, firstAndThirdTokens.size());
for (auto& killToken : firstAndThirdTokens) {
@@ -422,14 +427,13 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
// Kill the second session
{
- std::vector<Session::KillToken> secondToken;
+ std::vector<SessionCatalog::KillToken> secondToken;
catalog()->scanSessions(
SessionKiller::Matcher(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}),
- [&lsids, &secondToken](WithLock sessionCatalogLock, Session* session) {
- if (session->getSessionId() == lsids[1])
- secondToken.emplace_back(
- session->kill(sessionCatalogLock, ErrorCodes::ExceededTimeLimit));
+ [&lsids, &secondToken](const ObservableSession& session) {
+ if (session.getSessionId() == lsids[1])
+ secondToken.emplace_back(session.kill(ErrorCodes::ExceededTimeLimit));
});
ASSERT_EQ(1U, secondToken.size());
for (auto& killToken : secondToken) {
@@ -472,13 +476,13 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ConcurrentCheckOutAndKill) {
sideOpCtx->setLogicalSessionId(lsid);
// Kill the session
- std::vector<Session::KillToken> killTokens;
- catalog()->scanSessions(SessionKiller::Matcher(KillAllSessionsByPatternSet{
- makeKillAllSessionsByPattern(sideOpCtx.get())}),
- [&killTokens](WithLock sessionCatalogLock, Session* session) {
- killTokens.emplace_back(session->kill(
- sessionCatalogLock, ErrorCodes::InternalError));
- });
+ std::vector<SessionCatalog::KillToken> killTokens;
+ catalog()->scanSessions(
+ SessionKiller::Matcher(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(sideOpCtx.get())}),
+ [&killTokens](const ObservableSession& session) {
+ killTokens.emplace_back(session.kill(ErrorCodes::InternalError));
+ });
ASSERT_EQ(1U, killTokens.size());
auto checkOutSessionForKill(
catalog()->checkOutSessionForKill(sideOpCtx.get(), std::move(killTokens[0])));
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 066790d2236..83361ba3be8 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -307,7 +307,7 @@ const LogicalSessionId& TransactionParticipant::_sessionId() const {
OperationContext* TransactionParticipant::_opCtx() const {
const auto* owningSession = getTransactionParticipant.owner(this);
- auto* opCtx = owningSession->currentOperation();
+ auto* opCtx = owningSession->currentOperation_forTest();
invariant(opCtx);
return opCtx;
}
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index b20d33efa3a..16c2be1b588 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -1144,9 +1144,8 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr
](OperationContext * newOpCtx) {
newOpCtx->setLogicalSessionId(lsid);
newOpCtx->setTxnNumber(txnNumberToStart);
-
- auto session = SessionCatalog::get(newOpCtx)->checkOutSession(newOpCtx);
- auto txnParticipant = TransactionParticipant::get(session.get());
+ MongoDOperationContextSession ocs(newOpCtx);
+ auto txnParticipant = TransactionParticipant::get(newOpCtx);
ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(txnNumberToStart, false, true),
AssertionException,