summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/kill_sessions_local.cpp68
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp12
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp16
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp121
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp194
-rw-r--r--src/mongo/db/session.cpp86
-rw-r--r--src/mongo/db/session.h71
-rw-r--r--src/mongo/db/session_catalog.cpp101
-rw-r--r--src/mongo/db/session_catalog.h195
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp32
-rw-r--r--src/mongo/db/session_catalog_test.cpp86
-rw-r--r--src/mongo/db/transaction_participant.cpp2
-rw-r--r--src/mongo/db/transaction_participant_test.cpp5
14 files changed, 515 insertions, 475 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 41bfd7cbaa8..5cd1763cf13 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -668,7 +668,6 @@ env.Library(
target='session_catalog',
source=[
'session_catalog.cpp',
- 'session.cpp',
],
LIBDEPS=[
'kill_sessions',
diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp
index 56ffabf14cf..79b7283d4c3 100644
--- a/src/mongo/db/kill_sessions_local.cpp
+++ b/src/mongo/db/kill_sessions_local.cpp
@@ -55,17 +55,18 @@ namespace {
* context with 'reason' as the code.
* 3) Finish killing the selected and interrupted sessions through the 'killSessionFn'.
*/
-void killSessionsAction(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher,
- const stdx::function<bool(Session*)>& filterFn,
- const stdx::function<void(Session*)>& killSessionFn,
- ErrorCodes::Error reason = ErrorCodes::Interrupted) {
+void killSessionsAction(
+ OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher,
+ const stdx::function<bool(const ObservableSession&)>& filterFn,
+ const stdx::function<void(OperationContext*, const SessionToKill&)>& killSessionFn,
+ ErrorCodes::Error reason = ErrorCodes::Interrupted) {
const auto catalog = SessionCatalog::get(opCtx);
- std::vector<Session::KillToken> sessionKillTokens;
- catalog->scanSessions(matcher, [&](WithLock sessionCatalogLock, Session* session) {
+ std::vector<SessionCatalog::KillToken> sessionKillTokens;
+ catalog->scanSessions(matcher, [&](const ObservableSession& session) {
if (filterFn(session))
- sessionKillTokens.emplace_back(session->kill(sessionCatalogLock, reason));
+ sessionKillTokens.emplace_back(session.kill(reason));
});
for (auto& sessionKillToken : sessionKillTokens) {
@@ -73,11 +74,11 @@ void killSessionsAction(OperationContext* opCtx,
// TODO (SERVER-33850): Rename KillAllSessionsByPattern and
// ScopedKillAllSessionsByPatternImpersonator to not refer to session kill
- const KillAllSessionsByPattern* pattern = matcher.match(session->getSessionId());
+ const KillAllSessionsByPattern* pattern = matcher.match(session.getSessionId());
invariant(pattern);
ScopedKillAllSessionsByPatternImpersonator impersonator(opCtx, *pattern);
- killSessionFn(session.get());
+ killSessionFn(opCtx, session);
}
}
@@ -86,12 +87,13 @@ void killSessionsAction(OperationContext* opCtx,
void killSessionsLocalKillTransactions(OperationContext* opCtx,
const SessionKiller::Matcher& matcher,
ErrorCodes::Error reason) {
- killSessionsAction(
- opCtx,
- matcher,
- [](Session*) { return true; },
- [](Session* session) { TransactionParticipant::get(session)->abortArbitraryTransaction(); },
- reason);
+ killSessionsAction(opCtx,
+ matcher,
+ [](const ObservableSession&) { return true; },
+ [](OperationContext* opCtx, const SessionToKill& session) {
+ TransactionParticipant::get(session.get())->abortArbitraryTransaction();
+ },
+ reason);
}
SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
@@ -112,21 +114,21 @@ void killAllExpiredTransactions(OperationContext* opCtx) {
killSessionsAction(
opCtx,
matcherAllSessions,
- [](Session* session) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ [when = opCtx->getServiceContext()->getPreciseClockSource()->now()](
+ const ObservableSession& session) {
- return txnParticipant->expired();
+ return TransactionParticipant::get(session.get())->expired();
},
- [](Session* session) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ [](OperationContext* opCtx, const SessionToKill& session) {
+ auto txnParticipant = TransactionParticipant::get(session.get());
LOG(0)
<< "Aborting transaction with txnNumber " << txnParticipant->getActiveTxnNumber()
- << " on session " << session->getSessionId().getId()
+ << " on session " << session.getSessionId().getId()
<< " because it has been running for longer than 'transactionLifetimeLimitSeconds'";
- // The try/catch block below is necessary because expired() in the filterFn above could
- // return true for expired, but unprepared transaction, but by the time we get to
+ // The try/catch block below is necessary because expiredAsOf() in the filterFn above
+ // could return true for expired, but unprepared transaction, but by the time we get to
// actually kill it, the participant could theoretically become prepared (being under
// the SessionCatalog mutex doesn't prevent the concurrently running thread from doing
// preparing the participant).
@@ -136,8 +138,9 @@ void killAllExpiredTransactions(OperationContext* opCtx) {
try {
txnParticipant->abortArbitraryTransaction();
} catch (const DBException& ex) {
+ // TODO(schwerin): Can we catch a more specific exception?
warning() << "May have failed to abort expired transaction on session "
- << session->getSessionId().getId() << " due to " << redact(ex.toStatus());
+ << session.getSessionId().getId() << " due to " << redact(ex.toStatus());
}
},
ErrorCodes::ExceededTimeLimit);
@@ -148,8 +151,10 @@ void killSessionsLocalShutdownAllTransactions(OperationContext* opCtx) {
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
killSessionsAction(opCtx,
matcherAllSessions,
- [](Session*) { return true; },
- [](Session* session) { TransactionParticipant::get(session)->shutdown(); },
+ [](const ObservableSession&) { return true; },
+ [](OperationContext* opCtx, const SessionToKill& session) {
+ TransactionParticipant::get(session.get())->shutdown();
+ },
ErrorCodes::InterruptedAtShutdown);
}
@@ -159,15 +164,14 @@ void killSessionsAbortAllPreparedTransactions(OperationContext* opCtx) {
killSessionsAction(
opCtx,
matcherAllSessions,
- [](Session* session) {
+ [](const ObservableSession& session) {
// Filter for sessions that have a prepared transaction.
- const auto txnParticipant = TransactionParticipant::get(session);
- return txnParticipant->transactionIsPrepared();
+ return TransactionParticipant::get(session.get())->transactionIsPrepared();
},
- [](Session* session) {
+ [](OperationContext* opCtx, const SessionToKill& session) {
// Abort the prepared transaction and invalidate the session it is
// associated with.
- TransactionParticipant::get(session)->abortPreparedTransactionForRollback();
+ TransactionParticipant::get(session.get())->abortPreparedTransactionForRollback();
});
}
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index ea3b80b605e..c8bd4509843 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -334,7 +334,6 @@ TEST_F(OpObserverSessionCatalogRollbackTest,
OnRollbackDoesntInvalidateSessionCatalogIfNoSessionOpsRolledBack) {
const NamespaceString nss("testDB", "testColl");
- auto sessionCatalog = SessionCatalog::get(getServiceContext());
auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 0;
@@ -343,10 +342,8 @@ TEST_F(OpObserverSessionCatalogRollbackTest,
{
auto opCtx = cc().makeOperationContext();
opCtx->setLogicalSessionId(sessionId);
-
- // Create a session and sync it from disk
- auto session = sessionCatalog->checkOutSession(opCtx.get());
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ MongoDOperationContextSession ocs(opCtx.get());
+ const auto txnParticipant = TransactionParticipant::get(opCtx.get());
txnParticipant->refreshFromStorageIfNeeded();
// Simulate a write occurring on that session
@@ -369,9 +366,8 @@ TEST_F(OpObserverSessionCatalogRollbackTest,
{
auto opCtx = cc().makeOperationContext();
opCtx->setLogicalSessionId(sessionId);
-
- auto session = sessionCatalog->checkOutSession(opCtx.get());
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ MongoDOperationContextSession ocs(opCtx.get());
+ const auto txnParticipant = TransactionParticipant::get(opCtx.get());
ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(stmtId));
}
}
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 3ee21d3001b..aeef3174c5c 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -556,14 +556,14 @@ void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext
? makeSessionFilterForAuthenticatedUsers(opCtx)
: KillAllSessionsByPatternSet{{}});
- sessionCatalog->scanSessions({std::move(sessionFilter)},
- [&](WithLock sessionCatalogLock, Session* session) {
- auto op =
- TransactionParticipant::get(session)->reportStashedState();
- if (!op.isEmpty()) {
- ops->emplace_back(op);
- }
- });
+ sessionCatalog->scanSessions(
+ {std::move(sessionFilter)},
+ [&](const ObservableSession& session) {
+ auto op = TransactionParticipant::get(session.get())->reportStashedState();
+ if (!op.isEmpty()) {
+ ops->emplace_back(op);
+ }
+ });
}
std::unique_ptr<CollatorInterface> MongoInterfaceStandalone::_getCollectionDefaultCollator(
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 0cbe33a8d74..28437321ef5 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -42,7 +42,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/migration_session_id.h"
-#include "mongo/db/session_catalog.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/client/shard_registry.h"
@@ -204,8 +204,7 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
* Insert a new oplog entry by converting the oplogBSON into type 'n' oplog with the session
* information. The new oplogEntry will also link to prePostImageTs if not null.
*/
-ProcessOplogResult processSessionOplog(OperationContext* opCtx,
- const BSONObj& oplogBSON,
+ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
const ProcessOplogResult& lastResult) {
auto oplogEntry = parseOplog(oplogBSON);
const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
@@ -247,9 +246,12 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const auto stmtId = *oplogEntry.getStatementId();
- auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, result.sessionId);
- auto const txnParticipant = TransactionParticipant::get(scopedSession.get());
- txnParticipant->refreshFromStorageIfNeeded();
+ auto uniqueOpCtx = cc().makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
+ opCtx->setLogicalSessionId(result.sessionId);
+ opCtx->setTxnNumber(result.txnNum);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant->beginOrContinue(result.txnNum, boost::none, boost::none);
try {
@@ -396,10 +398,7 @@ void SessionCatalogMigrationDestination::join() {
*/
void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(ServiceContext* service) {
Client::initThread(
- "sessionCatalogMigration-" + _migrationSessionId.toString(), service, nullptr);
-
- auto uniqueCtx = cc().makeOperationContext();
- auto opCtx = uniqueCtx.get();
+ "sessionCatalogMigrationProducer-" + _migrationSessionId.toString(), service, nullptr);
bool oplogDrainedAfterCommiting = false;
ProcessOplogResult lastResult;
@@ -413,67 +412,75 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
}
- auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
- BSONArray oplogArray(nextBatch[kOplogField].Obj());
- BSONArrayIteratorSorted oplogIter(oplogArray);
-
- if (!oplogIter.more()) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_state == State::Committing) {
- // The migration is considered done only when it gets an empty result from the
- // source shard while this is in state committing. This is to make sure that it
- // doesn't miss any new oplog created between the time window where this
- // depleted the buffer from the source shard and receiving the commit command.
- if (oplogDrainedAfterCommiting) {
- break;
+ BSONObj nextBatch;
+ BSONArray oplogArray;
+ {
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+
+ nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
+ oplogArray = BSONArray{nextBatch[kOplogField].Obj()};
+
+ if (oplogArray.isEmpty()) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_state == State::Committing) {
+ // The migration is considered done only when it gets an empty result from
+ // the source shard while this is in state committing. This is to make sure
+ // that it doesn't miss any new oplog created between the time window where
+ // this depleted the buffer from the source shard and receiving the commit
+ // command.
+ if (oplogDrainedAfterCommiting) {
+ break;
+ }
+
+ oplogDrainedAfterCommiting = true;
}
+ }
- oplogDrainedAfterCommiting = true;
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(
+ waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
+
+ // We depleted the buffer at least once, transition to ready for commit.
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Note: only transition to "ready to commit" if state is not error/force stop.
+ if (_state == State::Migrating) {
+ _state = State::ReadyToCommit;
+ _isStateChanged.notify_all();
+ }
}
- }
- WriteConcernResult unusedWCResult;
- uassertStatusOK(
- waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
-
- // We depleted the buffer at least once, transition to ready for commit.
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Note: only transition to "ready to commit" if state is not error/force stop.
- if (_state == State::Migrating) {
- _state = State::ReadyToCommit;
- _isStateChanged.notify_all();
+ if (lastOpTimeWaited == lastResult.oplogTime) {
+ // We got an empty result at least twice in a row from the source shard so space
+ // it
+ // out a little bit so we don't hammer the shard
+ opCtx->sleepFor(Milliseconds(200));
}
- }
- if (lastOpTimeWaited == lastResult.oplogTime) {
- // We got an empty result at least twice in a row from the source shard so space it
- // out a little bit so we don't hammer the shard
- opCtx->sleepFor(Milliseconds(200));
+ lastOpTimeWaited = lastResult.oplogTime;
}
-
- lastOpTimeWaited = lastResult.oplogTime;
}
-
- while (oplogIter.more()) {
+ for (BSONArrayIteratorSorted oplogIter(oplogArray); oplogIter.more();) {
try {
- lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult);
- } catch (const DBException& ex) {
- if (ex.code() == ErrorCodes::ConflictingOperationInProgress ||
- ex.code() == ErrorCodes::TransactionTooOld) {
- // This means that the server has a newer txnNumber than the oplog being
- // migrated, so just skip it
- continue;
- }
-
- throw;
+ lastResult = processSessionOplog(oplogIter.next().Obj(), lastResult);
+ } catch (const ExceptionFor<ErrorCodes::ConfigurationInProgress>&) {
+ // This means that the server has a newer txnNumber than the oplog being
+ // migrated, so just skip it
+ continue;
+ } catch (const ExceptionFor<ErrorCodes::TransactionTooOld>&) {
+ // This means that the server has a newer txnNumber than the oplog being
+ // migrated, so just skip it
+ continue;
}
}
}
WriteConcernResult unusedWCResult;
- uassertStatusOK(waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
+ auto uniqueOpCtx = cc().makeOperationContext();
+ uassertStatusOK(
+ waitForWriteConcern(uniqueOpCtx.get(), lastResult.oplogTime, kMajorityWC, &unusedWCResult));
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 451ef2425cd..5cb0aa92c62 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -164,13 +164,14 @@ public:
return _migrationId.value();
}
- ScopedCheckedOutSession getSessionWithTxn(OperationContext* opCtx,
- const LogicalSessionId& sessionId,
- const TxnNumber& txnNum) {
- auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, sessionId);
- const auto txnParticipant = TransactionParticipant::get(scopedSession.get());
+ void setUpSessionWithTxn(OperationContext* opCtx,
+ const LogicalSessionId& sessionId,
+ const TxnNumber& txnNum) {
+ opCtx->setLogicalSessionId(sessionId);
+ opCtx->setTxnNumber(txnNum);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
- return scopedSession;
}
void checkOplog(const repl::OplogEntry& originalOplog, const repl::OplogEntry& oplogToCheck) {
@@ -194,21 +195,17 @@ public:
}
}
- void checkStatementExecuted(OperationContext* opCtx,
- Session* session,
- TxnNumber txnNumber,
- StmtId stmtId) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ void checkStatementExecuted(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
auto oplog = txnParticipant->checkStatementExecuted(stmtId);
ASSERT_TRUE(oplog);
}
void checkStatementExecuted(OperationContext* opCtx,
- Session* session,
TxnNumber txnNumber,
StmtId stmtId,
const repl::OplogEntry& expectedOplog) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
auto oplog = txnParticipant->checkStatementExecuted(stmtId);
ASSERT_TRUE(oplog);
checkOplogWithNestedOplog(expectedOplog, *oplog);
@@ -354,8 +351,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -369,9 +368,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
- checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
- checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
+ checkStatementExecuted(opCtx, 2, 23, oplog1);
+ checkStatementExecuted(opCtx, 2, 45, oplog2);
+ checkStatementExecuted(opCtx, 2, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn) {
@@ -416,15 +415,17 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, txnNum);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, txnNum);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), txnNum, 5, oplog3);
+ checkStatementExecuted(opCtx, txnNum, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparateBatches) {
@@ -468,8 +469,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -483,9 +486,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
- checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
- checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
+ checkStatementExecuted(opCtx, 2, 23, oplog1);
+ checkStatementExecuted(opCtx, 2, 45, oplog2);
+ checkStatementExecuted(opCtx, 2, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) {
@@ -536,32 +539,40 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
auto opCtx = operationContext();
{
- auto session = getSessionWithTxn(opCtx, sessionId1, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId1, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
+ checkStatementExecuted(opCtx, 2, 23, oplog1);
}
{
- auto session = getSessionWithTxn(opCtx, sessionId2, 42);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ // XXX TODO USE A DIFFERENT OPERATION CONTEXT!
+ auto client2 = getServiceContext()->makeClient("client2");
+ AlternativeClientRegion acr(client2);
+ auto opCtx2 = cc().makeOperationContext();
+ setUpSessionWithTxn(opCtx2.get(), sessionId2, 42);
+ MongoDOperationContextSession ocs(opCtx2.get());
+ auto txnParticipant = TransactionParticipant::get(opCtx2.get());
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
- checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx2.get()));
ASSERT_TRUE(historyIter.hasNext());
- checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
+ checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx2.get()));
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 42, 45, oplog2);
- checkStatementExecuted(opCtx, session.get(), 42, 5, oplog3);
+ checkStatementExecuted(opCtx2.get(), 42, 45, oplog2);
+ checkStatementExecuted(opCtx2.get(), 42, 5, oplog3);
}
}
@@ -612,8 +623,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
@@ -625,8 +638,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23);
- checkStatementExecuted(opCtx, session.get(), 2, 45);
+ checkStatementExecuted(opCtx, 2, 23);
+ checkStatementExecuted(opCtx, 2, 45);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindAndModify) {
@@ -663,8 +676,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -716,7 +731,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
ASSERT_TRUE(newPreImageOplog.getObject2());
ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
- checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
+ checkStatementExecuted(opCtx, 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFindAndModify) {
@@ -752,8 +767,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -805,7 +822,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
ASSERT_TRUE(newPostImageOplog.getObject2());
ASSERT_TRUE(newPostImageOplog.getObject2().value().isEmpty());
- checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
+ checkStatementExecuted(opCtx, 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModifySplitIn2Batches) {
@@ -844,8 +861,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -897,7 +916,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
ASSERT_TRUE(newPreImageOplog.getObject2());
ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
- checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
+ checkStatementExecuted(opCtx, 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
@@ -944,8 +963,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
- auto session = getSessionWithTxn(opCtx, sessionId, 20);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 20);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -956,7 +977,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 20, 0);
+ checkStatementExecuted(opCtx, 20, 0);
}
TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwrittenByOldMigrateTxn) {
@@ -1006,8 +1027,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
finishSessionExpectSuccess(&sessionMigration);
- auto session = getSessionWithTxn(opCtx, sessionId, 20);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 20);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1016,7 +1039,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
<< "newerSess"),
oplog.getObject());
- checkStatementExecuted(opCtx, session.get(), 20, 0);
+ checkStatementExecuted(opCtx, 20, 0);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) {
@@ -1186,8 +1209,10 @@ TEST_F(SessionCatalogMigrationDestinationTest,
finishSessionExpectSuccess(&sessionMigration);
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1203,9 +1228,9 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 0);
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
- checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
+ checkStatementExecuted(opCtx, 2, 0);
+ checkStatementExecuted(opCtx, 2, 23, oplog1);
+ checkStatementExecuted(opCtx, 2, 45, oplog2);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImageOplog) {
@@ -1490,8 +1515,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
finishSessionExpectSuccess(&sessionMigration);
- auto session = getSessionWithTxn(opCtx, sessionId, 19);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, sessionId, 19);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1508,9 +1535,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
ASSERT_TRUE(firstInsertOplog.getStatementId());
ASSERT_EQ(30, *firstInsertOplog.getStatementId());
- checkStatementExecuted(opCtx, session.get(), 19, 23, oplog1);
- checkStatementExecuted(opCtx, session.get(), 19, 30);
- checkStatementExecuted(opCtx, session.get(), 19, 45, oplog3);
+ checkStatementExecuted(opCtx, 19, 23, oplog1);
+ checkStatementExecuted(opCtx, 19, 30);
+ checkStatementExecuted(opCtx, 19, 45, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory) {
@@ -1555,8 +1582,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, *sessionInfo.getSessionId(), 2);
- const auto txnParticipant = TransactionParticipant::get(session.get());
+ setUpSessionWithTxn(opCtx, *sessionInfo.getSessionId(), 2);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1570,8 +1599,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplogEntries[0]);
- checkStatementExecuted(opCtx, session.get(), 2, 5, oplogEntries[2]);
+ checkStatementExecuted(opCtx, 2, 23, oplogEntries[0]);
+ checkStatementExecuted(opCtx, 2, 5, oplogEntries[2]);
ASSERT_THROWS(txnParticipant->checkStatementExecuted(38), AssertionException);
}
@@ -1586,9 +1615,10 @@ TEST_F(SessionCatalogMigrationDestinationTest,
// "Start" a new transaction on session 1, so that migrating the entries above will result
// in TransactionTooOld. This should not preclude the entries for session 2 from getting
// applied.
- auto scopedSession =
- SessionCatalog::get(opCtx)->checkOutSession(opCtx, *sessionInfo1.getSessionId());
- const auto txnParticipant = TransactionParticipant::get(scopedSession.get());
+ setUpSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), *sessionInfo1.getTxnNumber());
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
txnParticipant->refreshFromStorageIfNeeded();
txnParticipant->beginOrContinue(3, boost::none, boost::none);
}
@@ -1639,15 +1669,25 @@ TEST_F(SessionCatalogMigrationDestinationTest,
// Check nothing was written for session 1
{
- auto session1 = getSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), 3);
- const auto txnParticipant1 = TransactionParticipant::get(session1.get());
+ auto c1 = getServiceContext()->makeClient("c1");
+ AlternativeClientRegion acr(c1);
+ auto opCtx1 = cc().makeOperationContext();
+ auto opCtx = opCtx1.get();
+ setUpSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), 3);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant1 = TransactionParticipant::get(opCtx);
ASSERT(txnParticipant1->getLastWriteOpTime().isNull());
}
// Check session 2 was correctly updated
{
- auto session2 = getSessionWithTxn(opCtx, *sessionInfo2.getSessionId(), 15);
- const auto txnParticipant2 = TransactionParticipant::get(session2.get());
+ auto c2 = getServiceContext()->makeClient("c2");
+ AlternativeClientRegion acr(c2);
+ auto opCtx2 = cc().makeOperationContext();
+ auto opCtx = opCtx2.get();
+ setUpSessionWithTxn(opCtx, *sessionInfo2.getSessionId(), 15);
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant2 = TransactionParticipant::get(opCtx);
TransactionHistoryIterator historyIter(txnParticipant2->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
@@ -1658,8 +1698,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session2.get(), 15, 56, oplogEntries[1]);
- checkStatementExecuted(opCtx, session2.get(), 15, 55, oplogEntries[2]);
+ checkStatementExecuted(opCtx, 15, 56, oplogEntries[1]);
+ checkStatementExecuted(opCtx, 15, 55, oplogEntries[2]);
}
}
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
deleted file mode 100644
index f72457d7f94..00000000000
--- a/src/mongo/db/session.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/session.h"
-
-#include "mongo/db/operation_context.h"
-
-namespace mongo {
-
-Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
-
-OperationContext* Session::currentOperation() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _checkoutOpCtx;
-}
-
-Session::KillToken Session::kill(WithLock sessionCatalogLock, ErrorCodes::Error reason) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- const bool firstKiller = (0 == _killsRequested);
- ++_killsRequested;
-
- // For currently checked-out sessions, interrupt the operation context so that the current owner
- // can release the session
- if (firstKiller && _checkoutOpCtx) {
- const auto serviceContext = _checkoutOpCtx->getServiceContext();
-
- stdx::lock_guard<Client> clientLock(*_checkoutOpCtx->getClient());
- serviceContext->killOperation(_checkoutOpCtx, reason);
- }
-
- return KillToken(getSessionId());
-}
-
-bool Session::killed() const {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- return _killsRequested > 0;
-}
-
-void Session::_markCheckedOut(WithLock sessionCatalogLock, OperationContext* checkoutOpCtx) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_checkoutOpCtx);
- _checkoutOpCtx = checkoutOpCtx;
-}
-
-void Session::_markCheckedIn(WithLock sessionCatalogLock) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_checkoutOpCtx);
- _checkoutOpCtx = nullptr;
-}
-
-void Session::_markNotKilled(WithLock sessionCatalogLock, KillToken killToken) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- invariant(_killsRequested > 0);
- --_killsRequested;
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index ba01d6458f5..ad5a9dfb82b 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -32,13 +32,12 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/operation_context.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/decorable.h"
namespace mongo {
-class OperationContext;
-
/**
* A decorable container for state associated with an active session running on a MongoD or MongoS
* server. Refer to SessionCatalog for more information on the semantics of sessions.
@@ -46,10 +45,11 @@ class OperationContext;
class Session : public Decorable<Session> {
MONGO_DISALLOW_COPYING(Session);
+ friend class ObservableSession;
friend class SessionCatalog;
public:
- explicit Session(LogicalSessionId sessionId);
+ explicit Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
/**
* The logical session id that this object represents.
@@ -58,72 +58,23 @@ public:
return _sessionId;
}
- /**
- * Returns a pointer to the current operation running on this Session, or nullptr if there is no
- * operation currently running on this Session.
- */
- OperationContext* currentOperation() const;
-
- /**
- * Increments the number of "killers" for this session and returns a 'kill token' to to be
- * passed later on to 'checkOutSessionForKill' method of the SessionCatalog in order to permit
- * the caller to execute any kill cleanup tasks. This token is later on passed to
- * '_markNotKilled' in order to decrement the number of "killers".
- *
- * Marking session as killed is an internal property only that will cause any further calls to
- * 'checkOutSession' to block until 'checkOutSessionForKill' is called the same number of times
- * as 'kill' was called and the returned scoped object destroyed.
- *
- * If the first killer finds the session checked-out, this method will also interrupt the
- * operation context which has it checked-out.
- *
- * Must be called under the owning SessionCatalog's lock.
- */
- struct KillToken {
- KillToken(LogicalSessionId lsid) : lsidToKill(std::move(lsid)) {}
- KillToken(KillToken&&) = default;
- KillToken& operator=(KillToken&&) = default;
-
- LogicalSessionId lsidToKill;
- };
- KillToken kill(WithLock sessionCatalogLock, ErrorCodes::Error reason = ErrorCodes::Interrupted);
-
- /**
- * Returns whether 'kill' has been called on this session.
- */
- bool killed() const;
+ OperationContext* currentOperation_forTest() const {
+ return _checkoutOpCtx;
+ }
private:
- /**
- * Set/clear the current check-out state of the session by storing the operation which has this
- * session currently checked-out.
- *
- * Must be called under the SessionCatalog mutex and internally will acquire the Session mutex.
- */
- void _markCheckedOut(WithLock sessionCatalogLock, OperationContext* checkoutOpCtx);
- void _markCheckedIn(WithLock sessionCatalogLock);
-
- /**
- * Used by the session catalog when checking a session back in after a call to 'kill'. See the
- * comments for 'kill for more details.
- */
- void _markNotKilled(WithLock sessionCatalogLock, KillToken killToken);
-
// The id of the session with which this object is associated
const LogicalSessionId _sessionId;
- // Protects the member variables below. The order of lock acquisition should always be:
- //
- // 1) SessionCatalog mutex (if applicable)
- // 2) Session mutex
- // 3) Any decoration mutexes and/or the currently running Client's lock
- mutable stdx::mutex _mutex;
-
// A pointer back to the currently running operation on this Session, or nullptr if there
// is no operation currently running for the Session.
+ //
+ // May be read by holders of the SessionCatalog mutex. May only be set when clear or cleared
+ // when set, and the opCtx being set or cleared must have its client locked at the time.
OperationContext* _checkoutOpCtx{nullptr};
- // Incremented every time 'kill' is invoked and decremented by '_markNotKilled'.
+ // Counter indicating the number of times ObservableSession::kill has been called on this
+ // session, which have not yet had a corresponding call to checkOutSessionForKill.
int _killsRequested{0};
};
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index 4da7207b735..34bd12390e3 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -45,16 +45,16 @@ namespace {
const auto sessionTransactionTableDecoration = ServiceContext::declareDecoration<SessionCatalog>();
const auto operationSessionDecoration =
- OperationContext::declareDecoration<boost::optional<ScopedCheckedOutSession>>();
+ OperationContext::declareDecoration<boost::optional<SessionCatalog::ScopedCheckedOutSession>>();
} // namespace
SessionCatalog::~SessionCatalog() {
stdx::lock_guard<stdx::mutex> lg(_mutex);
for (const auto& entry : _sessions) {
- auto& sri = entry.second;
- invariant(!sri->session.currentOperation());
- invariant(!sri->session.killed());
+ ObservableSession session(lg, entry.second->session);
+ invariant(!session.currentOperation());
+ invariant(!session._killed());
}
}
@@ -72,56 +72,55 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) {
return &sessionTransactionTable;
}
-ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) {
- invariant(opCtx->getLogicalSessionId());
- return checkOutSession(opCtx, *opCtx->getLogicalSessionId());
-}
-
-ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx,
- const LogicalSessionId& lsid) {
+SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(OperationContext* opCtx) {
// This method is not supposed to be called with an already checked-out session due to risk of
// deadlock
+ invariant(opCtx->getLogicalSessionId());
invariant(!operationSessionDecoration(opCtx));
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
invariant(!opCtx->lockState()->isLocked());
stdx::unique_lock<stdx::mutex> ul(_mutex);
- auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid);
+ auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, *opCtx->getLogicalSessionId());
// Wait until the session is no longer checked out and until the previously scheduled kill has
// completed
- opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&sri]() {
- return !sri->session.currentOperation() && !sri->session.killed();
+ opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri]() {
+ ObservableSession osession(ul, sri->session);
+ return !osession.currentOperation() && !osession._killed();
});
- sri->session._markCheckedOut(ul, opCtx);
+ {
+ stdx::lock_guard<Client> lockClient(*opCtx->getClient());
+ sri->session._checkoutOpCtx = opCtx;
+ }
return ScopedCheckedOutSession(
*this, std::move(sri), boost::none /* Not checked out for kill */);
}
-ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* opCtx,
- Session::KillToken killToken) {
+SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationContext* opCtx,
+ KillToken killToken) {
// This method is not supposed to be called with an already checked-out session due to risk of
// deadlock
invariant(!operationSessionDecoration(opCtx));
invariant(!opCtx->getTxnNumber());
- const auto lsid = killToken.lsidToKill;
-
stdx::unique_lock<stdx::mutex> ul(_mutex);
- auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid);
- invariant(sri->session.killed());
+ auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, killToken.lsidToKill);
+ invariant(ObservableSession(ul, sri->session)._killed());
// Wait until the session is no longer checked out
- opCtx->waitForConditionOrInterrupt(
- sri->availableCondVar, ul, [&sri]() { return !sri->session.currentOperation(); });
+ opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri]() {
+ return !ObservableSession(ul, sri->session).currentOperation();
+ });
- invariant(!sri->session.currentOperation());
- sri->session._markCheckedOut(ul, opCtx);
+ {
+ stdx::lock_guard<Client> lockClient(*opCtx->getClient());
+ sri->session._checkoutOpCtx = opCtx;
+ }
- return ScopedCheckedOutSession(
- *this, std::move(sri), std::move(killToken) /* Checked out for kill */);
+ return SessionToKill(ScopedCheckedOutSession(*this, std::move(sri), std::move(killToken)));
}
void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
@@ -132,18 +131,18 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
for (auto& sessionEntry : _sessions) {
if (matcher.match(sessionEntry.first)) {
- workerFn(lg, &sessionEntry.second->session);
+ workerFn({lg, sessionEntry.second->session});
}
}
}
-Session::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) {
+SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
auto it = _sessions.find(lsid);
uassert(ErrorCodes::NoSuchSession, "Session not found", it != _sessions.end());
auto& sri = it->second;
- return sri->session.kill(lg);
+ return ObservableSession(lg, sri->session).kill();
}
std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreateSessionRuntimeInfo(
@@ -157,21 +156,46 @@ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreate
}
void SessionCatalog::_releaseSession(std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri,
- boost::optional<Session::KillToken> killToken) {
+ boost::optional<SessionCatalog::KillToken> killToken) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
// Make sure we have exactly the same session on the map and that it is still associated with an
// operation context (meaning checked-out)
invariant(_sessions[sri->session.getSessionId()] == sri);
- invariant(sri->session.currentOperation());
-
- sri->session._markCheckedIn(lg);
+ invariant(sri->session._checkoutOpCtx);
+ {
+ stdx::lock_guard<Client> lockClient(*sri->session._checkoutOpCtx->getClient());
+ sri->session._checkoutOpCtx = nullptr;
+ }
sri->availableCondVar.notify_all();
if (killToken) {
- invariant(sri->session.killed());
- sri->session._markNotKilled(lg, std::move(*killToken));
+ invariant(sri->session._killsRequested > 0);
+ --sri->session._killsRequested;
+ }
+}
+
+OperationContext* ObservableSession::currentOperation() const {
+ return _session->_checkoutOpCtx;
+}
+
+SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) const {
+ const bool firstKiller = (0 == _session->_killsRequested);
+ ++_session->_killsRequested;
+
+ // For currently checked-out sessions, interrupt the operation context so that the current owner
+ // can release the session
+ if (firstKiller && _session->_checkoutOpCtx) {
+ invariant(_clientLock);
+ const auto serviceContext = _session->_checkoutOpCtx->getServiceContext();
+ serviceContext->killOperation(_session->_checkoutOpCtx, reason);
}
+
+ return SessionCatalog::KillToken(getSessionId());
+}
+
+bool ObservableSession::_killed() const {
+ return _session->_killsRequested > 0;
}
OperationContextSession::OperationContextSession(OperationContext* opCtx) : _opCtx(opCtx) {
@@ -217,7 +241,8 @@ void OperationContextSession::checkIn(OperationContext* opCtx) {
// but destruction of the checkedOutSession must not be, as it takes the SessionCatalog mutex,
// and other code may take the Client lock while holding that mutex.
stdx::unique_lock<Client> lk(*opCtx->getClient());
- ScopedCheckedOutSession sessionToReleaseOutOfLock(std::move(*checkedOutSession));
+ SessionCatalog::ScopedCheckedOutSession sessionToReleaseOutOfLock(
+ std::move(*checkedOutSession));
// This destroys the moved-from ScopedCheckedOutSession, and must be done within the client lock
checkedOutSession = boost::none;
@@ -229,7 +254,7 @@ void OperationContextSession::checkOut(OperationContext* opCtx) {
invariant(!checkedOutSession);
const auto catalog = SessionCatalog::get(opCtx);
- auto scopedCheckedOutSession = catalog->checkOutSession(opCtx);
+ auto scopedCheckedOutSession = catalog->_checkOutSession(opCtx);
// We acquire a Client lock here to guard the construction of this session so that references to
// this session are safe to use while the lock is held
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 1ead5cad736..c22c96ff8ba 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -34,7 +34,9 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/client.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/session.h"
#include "mongo/db/session_killer.h"
#include "mongo/stdx/condition_variable.h"
@@ -44,10 +46,7 @@
namespace mongo {
-class OperationContext;
-class ScopedSession;
-class ScopedCheckedOutSession;
-class ServiceContext;
+class ObservableSession;
/**
* Keeps track of the transaction runtime state for every active session on this instance.
@@ -55,10 +54,21 @@ class ServiceContext;
class SessionCatalog {
MONGO_DISALLOW_COPYING(SessionCatalog);
- friend class ScopedSession;
- friend class ScopedCheckedOutSession;
+ friend class ObservableSession;
+ friend class OperationContextSession;
public:
+ class ScopedCheckedOutSession;
+ class SessionToKill;
+
+ struct KillToken {
+ KillToken(LogicalSessionId lsid) : lsidToKill(std::move(lsid)) {}
+ KillToken(KillToken&&) = default;
+ KillToken& operator=(KillToken&&) = default;
+
+ LogicalSessionId lsidToKill;
+ };
+
SessionCatalog() = default;
~SessionCatalog();
@@ -75,28 +85,10 @@ public:
void reset_forTest();
/**
- * Potentially blocking call, which either creates a brand new session object (if one doesn't
- * exist) or "checks-out" the existing one (if it is not currently in use or marked for kill).
- *
- * The 'opCtx'-only variant uses the session information stored on the operation context and the
- * variant, which has the 'lsid' parameter checks-out that session id. Neither of these methods
- * can be called with an already checked-out session.
- *
- * Checking out a session puts it in the 'checked out' state and all subsequent calls to
- * checkout will block until it is checked back in. This happens when the returned object goes
- * out of scope.
- *
- * Throws exception on errors.
- */
- ScopedCheckedOutSession checkOutSession(OperationContext* opCtx);
- ScopedCheckedOutSession checkOutSession(OperationContext* opCtx, const LogicalSessionId& lsid);
-
- /**
- * See the description of 'Session::kill' for more information on the session kill usage
- * pattern.
+ * See the description of 'ObservableSession::kill' for more information on the session kill
+ * usage pattern.
*/
- ScopedCheckedOutSession checkOutSessionForKill(OperationContext* opCtx,
- Session::KillToken killToken);
+ SessionToKill checkOutSessionForKill(OperationContext* opCtx, KillToken killToken);
/**
* Iterates through the SessionCatalog under the SessionCatalog mutex and applies 'workerFn' to
@@ -109,7 +101,7 @@ public:
*
* TODO SERVER-33850: Take Matcher out of the SessionKiller namespace.
*/
- using ScanSessionsCallbackFn = stdx::function<void(WithLock, Session*)>;
+ using ScanSessionsCallbackFn = stdx::function<void(const ObservableSession&)>;
void scanSessions(const SessionKiller::Matcher& matcher,
const ScanSessionsCallbackFn& workerFn);
@@ -117,7 +109,7 @@ public:
* Shortcut to invoke 'kill' on the specified session under the SessionCatalog mutex. Throws a
* NoSuchSession exception if the session doesn't exist.
*/
- Session::KillToken killSession(const LogicalSessionId& lsid);
+ KillToken killSession(const LogicalSessionId& lsid);
private:
struct SessionRuntimeInfo {
@@ -132,6 +124,8 @@ private:
stdx::condition_variable availableCondVar;
};
+ ScopedCheckedOutSession _checkOutSession(OperationContext* opCtx);
+
/**
* May release and re-acquire it zero or more times before returning. The returned
* 'SessionRuntimeInfo' is guaranteed to be linked on the catalog's _txnTable as long as the
@@ -144,7 +138,7 @@ private:
* Makes a session, previously checked out through 'checkoutSession', available again.
*/
void _releaseSession(std::shared_ptr<SessionRuntimeInfo> sri,
- boost::optional<Session::KillToken> killToken);
+ boost::optional<KillToken> killToken);
stdx::mutex _mutex;
@@ -153,19 +147,20 @@ private:
};
/**
- * Scoped object representing a checked-out session. See comments for the 'checkoutSession' method
- * for more information on its behaviour.
+ * Scoped object representing a checked-out session. This type is an implementation detail
+ * of the SessionCatalog.
*/
-class ScopedCheckedOutSession {
- MONGO_DISALLOW_COPYING(ScopedCheckedOutSession);
-
- friend ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext*,
- const LogicalSessionId&);
- friend ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext*,
- Session::KillToken);
-
+class SessionCatalog::ScopedCheckedOutSession {
public:
+ ScopedCheckedOutSession(SessionCatalog& catalog,
+ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri,
+ boost::optional<SessionCatalog::KillToken> killToken)
+ : _catalog(catalog), _sri(std::move(sri)), _killToken(std::move(killToken)) {}
+
ScopedCheckedOutSession(ScopedCheckedOutSession&&) = default;
+ ScopedCheckedOutSession& operator=(ScopedCheckedOutSession&&) = delete;
+ ScopedCheckedOutSession(const ScopedCheckedOutSession&) = delete;
+ ScopedCheckedOutSession& operator=(ScopedCheckedOutSession&) = delete;
~ScopedCheckedOutSession() {
if (_sri) {
@@ -190,19 +185,117 @@ public:
}
private:
- ScopedCheckedOutSession(SessionCatalog& catalog,
- std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri,
- boost::optional<Session::KillToken> killToken)
- : _catalog(catalog), _sri(std::move(sri)), _killToken(std::move(killToken)) {}
-
// The owning session catalog into which the session should be checked back
SessionCatalog& _catalog;
std::shared_ptr<SessionCatalog::SessionRuntimeInfo> _sri;
+ boost::optional<SessionCatalog::KillToken> _killToken;
+};
+
+/**
+ * RAII type returned by SessionCatalog::checkOutSessionForKill.
+ *
+ * After calling kill() on an ObservableSession, let that ObservableSession go out
+ * of scope and in a context outside of SessionCatalog::scanSessions, call checkOutSessionForKill
+ * to get an instance of this type. Then, while holding that instance, perform any cleanup
+ * you need to perform on a session as part of killing it. More details in the description of
+ * ObservableSession::kill, below.
+ */
+class SessionCatalog::SessionToKill {
+public:
+ SessionToKill(ScopedCheckedOutSession&& scos) : _scos(std::move(scos)) {}
+ Session* get() const {
+ return _scos.get();
+ }
+
+ const LogicalSessionId& getSessionId() const {
+ return get()->getSessionId();
+ }
+ OperationContext* currentOperation_forTest() const {
+ return get()->currentOperation_forTest();
+ }
- // Only set if the session was obtained though checkOutSessionForKill
- boost::optional<Session::KillToken> _killToken;
+private:
+ ScopedCheckedOutSession _scos;
};
+using SessionToKill = SessionCatalog::SessionToKill;
+
+/**
+ * This type represents access to a session inside of a scanSessions loop.
+ * If you have one of these, you're in a scanSessions callback context, and so
+ * have locked the whole catalog and, if the observed session is bound to an operation context,
+ * you hold that operation context's client's mutex, as well.
+ */
+class ObservableSession {
+public:
+ ObservableSession(const ObservableSession&) = delete;
+ ObservableSession(ObservableSession&&) = delete;
+ ObservableSession& operator=(const ObservableSession&) = delete;
+ ObservableSession& operator=(ObservableSession&&) = delete;
+
+ /**
+ * The logical session id that this object represents.
+ */
+ const LogicalSessionId& getSessionId() const {
+ return _session->_sessionId;
+ }
+
+ /**
+ * Returns a pointer to the current operation running on this Session, or nullptr if there is no
+ * operation currently running on this Session.
+ */
+ OperationContext* currentOperation() const;
+
+ /**
+ * Increments the number of "killers" for this session and returns a 'kill token' to to be
+ * passed later on to 'checkOutSessionForKill' method of the SessionCatalog in order to permit
+ * the caller to execute any kill cleanup tasks. This token is later on passed to
+ * '_markNotKilled' in order to decrement the number of "killers".
+ *
+ * Marking session as killed is an internal property only that will cause any further calls to
+ * 'checkOutSession' to block until 'checkOutSessionForKill' is called the same number of times
+ * as 'kill' was called and the returned scoped object destroyed.
+ *
+ * If the first killer finds the session checked-out, this method will also interrupt the
+ * operation context which has it checked-out.
+ */
+ SessionCatalog::KillToken kill(ErrorCodes::Error reason = ErrorCodes::Interrupted) const;
+
+ /**
+ * Returns a pointer to the Session itself.
+ */
+ Session* get() const {
+ return _session;
+ }
+
+private:
+ friend class SessionCatalog;
+
+ static stdx::unique_lock<Client> _lockClientForSession(WithLock, Session* session) {
+ if (const auto opCtx = session->_checkoutOpCtx) {
+ return stdx::unique_lock<Client>{*opCtx->getClient()};
+ }
+ return {};
+ }
+
+ ObservableSession(WithLock wl, Session& session)
+ : _session(&session), _clientLock(_lockClientForSession(std::move(wl), _session)) {}
+
+ /**
+ * Returns whether 'kill' has been called on this session.
+ */
+ bool _killed() const;
+
+ /**
+ * Used by the session catalog when checking a session back in after a call to 'kill'. See the
+ * comments for 'kill for more details.
+ */
+ void _markNotKilled(WithLock sessionCatalogLock, SessionCatalog::KillToken killToken);
+
+ Session* _session;
+ stdx::unique_lock<Client> _clientLock;
+};
+
/**
* Scoped object, which checks out the session specified in the passed operation context and stores
@@ -213,11 +306,17 @@ class OperationContextSession {
MONGO_DISALLOW_COPYING(OperationContextSession);
public:
+ /**
+ * Acquires the session with id opCtx->getLogicalSessionId(). Because a session can only be
+ * checked out by one user at a time, construction of OperationContextSession can block waiting
+ * for the desired session to be checked in by another user.
+ */
OperationContextSession(OperationContext* opCtx);
~OperationContextSession();
/**
- * Returns the session checked out in the constructor.
+ * Returns the session currently checked out by "opCtx", or nullptr if the opCtx has no
+ * checked out session.
*/
static Session* get(OperationContext* opCtx);
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index c5fae50fe9c..dd6072af24e 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -74,8 +74,9 @@ auto getThreadPool(OperationContext* opCtx) {
return &sessionTasksExecutor(opCtx->getServiceContext()).threadPool;
}
-void killSessionTokensFunction(OperationContext* opCtx,
- std::shared_ptr<std::vector<Session::KillToken>> sessionKillTokens) {
+void killSessionTokensFunction(
+ OperationContext* opCtx,
+ std::shared_ptr<std::vector<SessionCatalog::KillToken>> sessionKillTokens) {
if (sessionKillTokens->empty())
return;
@@ -105,7 +106,7 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
const auto catalog = SessionCatalog::get(opCtx);
// The use of shared_ptr here is in order to work around the limitation of stdx::function that
// the functor must be copyable.
- auto sessionKillTokens = std::make_shared<std::vector<Session::KillToken>>();
+ auto sessionKillTokens = std::make_shared<std::vector<SessionCatalog::KillToken>>();
// Scan all sessions and reacquire locks for prepared transactions.
// There may be sessions that are checked out during this scan, but none of them
@@ -115,14 +116,14 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
SessionKiller::Matcher matcher(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
- catalog->scanSessions(matcher, [&](WithLock sessionCatalogLock, Session* session) {
- const auto txnParticipant = TransactionParticipant::get(session);
+ catalog->scanSessions(matcher, [&](const ObservableSession& session) {
+ const auto txnParticipant = TransactionParticipant::get(session.get());
if (!txnParticipant->inMultiDocumentTransaction()) {
- sessionKillTokens->emplace_back(session->kill(sessionCatalogLock));
+ sessionKillTokens->emplace_back(session.kill());
}
if (txnParticipant->transactionIsPrepared()) {
- sessionIdToReacquireLocks.emplace_back(session->getSessionId());
+ sessionIdToReacquireLocks.emplace_back(session.getSessionId());
}
});
killSessionTokensFunction(opCtx, sessionKillTokens);
@@ -131,10 +132,12 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
// Create a new opCtx because we need an empty locker to refresh the locks.
auto newClient = opCtx->getServiceContext()->makeClient("restore-prepared-txn");
AlternativeClientRegion acr(newClient);
- auto newOpCtx = cc().makeOperationContext();
for (const auto& sessionId : sessionIdToReacquireLocks) {
- auto scopedSessionCheckOut = catalog->checkOutSession(newOpCtx.get(), sessionId);
- auto txnParticipant = TransactionParticipant::get(scopedSessionCheckOut.get());
+ auto newOpCtx = cc().makeOperationContext();
+ newOpCtx->setLogicalSessionId(sessionId);
+ MongoDOperationContextSession ocs(newOpCtx.get());
+ auto txnParticipant =
+ TransactionParticipant::get(OperationContextSession::get(newOpCtx.get()));
txnParticipant->refreshLocksForPreparedTransaction(newOpCtx.get(), false);
}
}
@@ -195,7 +198,7 @@ void MongoDSessionCatalog::invalidateSessions(OperationContext* opCtx,
// The use of shared_ptr here is in order to work around the limitation of stdx::function that
// the functor must be copyable.
- auto sessionKillTokens = std::make_shared<std::vector<Session::KillToken>>();
+ auto sessionKillTokens = std::make_shared<std::vector<SessionCatalog::KillToken>>();
if (singleSessionDoc) {
sessionKillTokens->emplace_back(catalog->killSession(LogicalSessionId::parse(
@@ -203,10 +206,9 @@ void MongoDSessionCatalog::invalidateSessions(OperationContext* opCtx,
} else {
SessionKiller::Matcher matcher(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
- catalog->scanSessions(
- matcher, [&sessionKillTokens](WithLock sessionCatalogLock, Session* session) {
- sessionKillTokens->emplace_back(session->kill(sessionCatalogLock));
- });
+ catalog->scanSessions(matcher, [&sessionKillTokens](const ObservableSession& session) {
+ sessionKillTokens->emplace_back(session.kill());
+ });
}
killSessionTokensFunction(opCtx, sessionKillTokens);
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index 9d9e532abad..b65a933d6c7 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -74,11 +74,11 @@ private:
TEST_F(SessionCatalogTestWithDefaultOpCtx, CheckoutAndReleaseSession) {
_opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ OperationContextSession ocs(_opCtx);
- auto scopedSession = catalog()->checkOutSession(_opCtx);
-
- ASSERT(scopedSession.get());
- ASSERT_EQ(*_opCtx->getLogicalSessionId(), scopedSession->getSessionId());
+ auto session = OperationContextSession::get(_opCtx);
+ ASSERT(session);
+ ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId());
}
TEST_F(SessionCatalogTestWithDefaultOpCtx, OperationContextCheckedOutSession) {
@@ -120,8 +120,8 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) {
TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) {
std::vector<LogicalSessionId> lsidsFound;
- const auto workerFn = [&lsidsFound](WithLock, Session* session) {
- lsidsFound.push_back(session->getSessionId());
+ const auto workerFn = [&lsidsFound](const ObservableSession& session) {
+ lsidsFound.push_back(session.getSessionId());
};
// Scan over zero Sessions.
@@ -139,7 +139,8 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) {
stdx::async(stdx::launch::async, [this, lsid] {
ThreadClient tc(getServiceContext());
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}).get();
}
@@ -163,7 +164,8 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
// Create the session so there is something to kill
{
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}
auto killToken = catalog()->killSession(lsid);
@@ -182,7 +184,8 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
auto future = stdx::async(stdx::launch::async, [this, lsid] {
ThreadClient tc(getServiceContext());
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ sideOpCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(sideOpCtx.get());
});
ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration()));
@@ -190,13 +193,14 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
}
// Make sure that session check-out after kill succeeds again
{
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}
// Make sure the "regular operation" eventually is able to proceed and use the just killed
@@ -225,8 +229,7 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
sideOpCtx->setLogicalSessionId(lsid);
sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired);
-
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ OperationContextSession ocs(sideOpCtx.get());
});
ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired);
@@ -248,7 +251,8 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
auto future = stdx::async(stdx::launch::async, [this, lsid] {
ThreadClient tc(getServiceContext());
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ sideOpCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(sideOpCtx.get());
});
ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration()));
@@ -256,13 +260,14 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
}
// Make sure that session check-out after kill succeeds again
{
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}
// Make sure the "regular operation" eventually is able to proceed and use the just killed
@@ -276,7 +281,8 @@ TEST_F(SessionCatalogTest, MarkSessionAsKilledCanBeCalledMoreThanOnce) {
// Create the session so there is something to kill
{
auto opCtx = makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
}
auto killToken1 = catalog()->killSession(lsid);
@@ -292,13 +298,13 @@ TEST_F(SessionCatalogTest, MarkSessionAsKilledCanBeCalledMoreThanOnce) {
OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired);
}
- boost::optional<Session::KillToken> killTokenWhileSessionIsCheckedOutForKill;
+ boost::optional<SessionCatalog::KillToken> killTokenWhileSessionIsCheckedOutForKill;
// Finish the first killer of the session
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken1));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
// Killing a session while checked out for kill should not affect the killers
killTokenWhileSessionIsCheckedOutForKill.emplace(catalog()->killSession(lsid));
@@ -316,13 +322,13 @@ TEST_F(SessionCatalogTest, MarkSessionAsKilledCanBeCalledMoreThanOnce) {
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken2));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
}
{
auto opCtx = makeOperationContext();
auto scopedSession = catalog()->checkOutSessionForKill(
opCtx.get(), std::move(*killTokenWhileSessionIsCheckedOutForKill));
- ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest());
}
}
@@ -379,7 +385,8 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
{
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ sideOpCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(sideOpCtx.get());
firstUseOfTheSessionReachedBarrier.countDownAndWait();
@@ -391,8 +398,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
{
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
sideOpCtx->setLogicalSessionId(lsid);
-
- const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
+ OperationContextSession ocs(sideOpCtx.get());
}
}));
}
@@ -402,14 +408,13 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
// Kill the first and the third sessions
{
- std::vector<Session::KillToken> firstAndThirdTokens;
+ std::vector<SessionCatalog::KillToken> firstAndThirdTokens;
catalog()->scanSessions(
SessionKiller::Matcher(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}),
- [&lsids, &firstAndThirdTokens](WithLock sessionCatalogLock, Session* session) {
- if (session->getSessionId() == lsids[0] || session->getSessionId() == lsids[2])
- firstAndThirdTokens.emplace_back(
- session->kill(sessionCatalogLock, ErrorCodes::ExceededTimeLimit));
+ [&lsids, &firstAndThirdTokens](const ObservableSession& session) {
+ if (session.getSessionId() == lsids[0] || session.getSessionId() == lsids[2])
+ firstAndThirdTokens.emplace_back(session.kill(ErrorCodes::ExceededTimeLimit));
});
ASSERT_EQ(2U, firstAndThirdTokens.size());
for (auto& killToken : firstAndThirdTokens) {
@@ -422,14 +427,13 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
// Kill the second session
{
- std::vector<Session::KillToken> secondToken;
+ std::vector<SessionCatalog::KillToken> secondToken;
catalog()->scanSessions(
SessionKiller::Matcher(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}),
- [&lsids, &secondToken](WithLock sessionCatalogLock, Session* session) {
- if (session->getSessionId() == lsids[1])
- secondToken.emplace_back(
- session->kill(sessionCatalogLock, ErrorCodes::ExceededTimeLimit));
+ [&lsids, &secondToken](const ObservableSession& session) {
+ if (session.getSessionId() == lsids[1])
+ secondToken.emplace_back(session.kill(ErrorCodes::ExceededTimeLimit));
});
ASSERT_EQ(1U, secondToken.size());
for (auto& killToken : secondToken) {
@@ -472,13 +476,13 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ConcurrentCheckOutAndKill) {
sideOpCtx->setLogicalSessionId(lsid);
// Kill the session
- std::vector<Session::KillToken> killTokens;
- catalog()->scanSessions(SessionKiller::Matcher(KillAllSessionsByPatternSet{
- makeKillAllSessionsByPattern(sideOpCtx.get())}),
- [&killTokens](WithLock sessionCatalogLock, Session* session) {
- killTokens.emplace_back(session->kill(
- sessionCatalogLock, ErrorCodes::InternalError));
- });
+ std::vector<SessionCatalog::KillToken> killTokens;
+ catalog()->scanSessions(
+ SessionKiller::Matcher(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(sideOpCtx.get())}),
+ [&killTokens](const ObservableSession& session) {
+ killTokens.emplace_back(session.kill(ErrorCodes::InternalError));
+ });
ASSERT_EQ(1U, killTokens.size());
auto checkOutSessionForKill(
catalog()->checkOutSessionForKill(sideOpCtx.get(), std::move(killTokens[0])));
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 066790d2236..83361ba3be8 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -307,7 +307,7 @@ const LogicalSessionId& TransactionParticipant::_sessionId() const {
OperationContext* TransactionParticipant::_opCtx() const {
const auto* owningSession = getTransactionParticipant.owner(this);
- auto* opCtx = owningSession->currentOperation();
+ auto* opCtx = owningSession->currentOperation_forTest();
invariant(opCtx);
return opCtx;
}
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index b20d33efa3a..16c2be1b588 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -1144,9 +1144,8 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr
](OperationContext * newOpCtx) {
newOpCtx->setLogicalSessionId(lsid);
newOpCtx->setTxnNumber(txnNumberToStart);
-
- auto session = SessionCatalog::get(newOpCtx)->checkOutSession(newOpCtx);
- auto txnParticipant = TransactionParticipant::get(session.get());
+ MongoDOperationContextSession ocs(newOpCtx);
+ auto txnParticipant = TransactionParticipant::get(newOpCtx);
ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(txnNumberToStart, false, true),
AssertionException,