diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 73 | ||||
-rw-r--r-- | src/mongo/db/commands/logical_session_server_status_section.cpp (renamed from src/mongo/db/logical_session_server_status_section.cpp) | 14 | ||||
-rw-r--r-- | src/mongo/db/commands/reap_logical_session_cache_now.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/commands/refresh_logical_session_cache_now.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/commands/refresh_sessions_command.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_commands.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/session.h | 3 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.h | 38 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_mongod.cpp | 37 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_mongod.h | 2 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_test.cpp | 102 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.h | 5 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 12 |
17 files changed, 333 insertions, 124 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index c6fee300b60..de9d62a0e49 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1529,7 +1529,6 @@ env.Library( source=[ 'initialize_operation_session_info.cpp', 'logical_session_cache_impl.cpp', - 'logical_session_server_status_section.cpp', ], LIBDEPS=[ 'logical_session_cache', @@ -1540,7 +1539,6 @@ env.Library( 'kill_sessions', ], LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/commands/server_status', '$BUILD_DIR/mongo/db/s/sharding_api_d', ] ) diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 8c7fc3639ad..2d1be53c64f 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -91,21 +91,21 @@ env.Library( # Commands available in every process that executes commands env.Library( - target="core", + target='core', source=[ - "end_sessions_command.cpp", - "fail_point_cmd.cpp", - "find_and_modify_common.cpp", - "generic.cpp", - "hashcmd.cpp", - "kill_all_sessions_by_pattern_command.cpp", - "kill_all_sessions_command.cpp", - "kill_sessions_command.cpp", - "parameters.cpp", - "refresh_logical_session_cache_now.cpp", - "refresh_sessions_command.cpp", - "rename_collection_common.cpp", - "start_session_command.cpp", + 'end_sessions_command.cpp', + 'fail_point_cmd.cpp', + 'find_and_modify_common.cpp', + 'generic.cpp', + 'hashcmd.cpp', + 'kill_all_sessions_by_pattern_command.cpp', + 'kill_all_sessions_command.cpp', + 'kill_sessions_command.cpp', + 'parameters.cpp', + 'refresh_logical_session_cache_now.cpp', + 'refresh_sessions_command.cpp', + 'rename_collection_common.cpp', + 'start_session_command.cpp', env.Idlc('end_sessions.idl')[0], env.Idlc('parameters.idl')[0], ], @@ -114,38 +114,39 @@ env.Library( '$BUILD_DIR/mongo/db/auth/auth', '$BUILD_DIR/mongo/db/auth/authprivilege', '$BUILD_DIR/mongo/db/commands', - '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/kill_sessions', - '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_cache_impl', - '$BUILD_DIR/mongo/db/logical_session_id', + '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_id_helpers', + '$BUILD_DIR/mongo/db/logical_session_id', '$BUILD_DIR/mongo/db/mongohasher', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/idl/server_parameter', '$BUILD_DIR/mongo/logger/parse_log_component_settings', '$BUILD_DIR/mongo/rpc/protocol', + 'test_commands_enabled', ], ) # Commands available in all mongodb server processes (mongod, mongos, etc.) env.Library( - target="servers", + target='servers', source=[ - "authentication_commands.cpp", - "conn_pool_stats.cpp", - "conn_pool_sync.cpp", - "connection_status.cpp", - "drop_connections_command.cpp", + 'authentication_commands.cpp', + 'conn_pool_stats.cpp', + 'conn_pool_sync.cpp', + 'connection_status.cpp', + 'drop_connections_command.cpp', + 'generic_servers.cpp', + 'isself.cpp', + 'logical_session_server_status_section.cpp', + 'mr_common.cpp', + 'reap_logical_session_cache_now.cpp', + 'refresh_sessions_command_internal.cpp', + 'traffic_recording_cmds.cpp', + 'user_management_commands_common.cpp', env.Idlc('drop_connections.idl')[0], - "generic_servers.cpp", - "isself.cpp", - "mr_common.cpp", - "reap_logical_session_cache_now.cpp", - "refresh_sessions_command_internal.cpp", - "traffic_recording_cmds.cpp", - "user_management_commands_common.cpp", ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/client/clientdriver_minimal', @@ -153,25 +154,27 @@ env.Library( '$BUILD_DIR/mongo/db/auth/sasl_options', '$BUILD_DIR/mongo/db/auth/user_document_parser', '$BUILD_DIR/mongo/db/commands', - '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/log_process_details', - '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_cache_impl', - '$BUILD_DIR/mongo/db/logical_session_id', + '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_id_helpers', + '$BUILD_DIR/mongo/db/logical_session_id', '$BUILD_DIR/mongo/db/repl/isself', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', - '$BUILD_DIR/mongo/executor/egress_tag_closer_manager', + '$BUILD_DIR/mongo/db/session_catalog', '$BUILD_DIR/mongo/db/traffic_recorder', + '$BUILD_DIR/mongo/executor/egress_tag_closer_manager', '$BUILD_DIR/mongo/executor/task_executor_pool', '$BUILD_DIR/mongo/rpc/client_metadata', - '$BUILD_DIR/mongo/s/sharding_legacy_api', '$BUILD_DIR/mongo/s/coreshard', + '$BUILD_DIR/mongo/s/sharding_legacy_api', '$BUILD_DIR/mongo/scripting/scripting_common', '$BUILD_DIR/mongo/util/ntservice', 'core', 'feature_compatibility_parsers', + 'server_status', + 'test_commands_enabled', ] ) diff --git a/src/mongo/db/logical_session_server_status_section.cpp b/src/mongo/db/commands/logical_session_server_status_section.cpp index 971748f7de6..b23605a5506 100644 --- a/src/mongo/db/logical_session_server_status_section.cpp +++ b/src/mongo/db/commands/logical_session_server_status_section.cpp @@ -35,14 +35,11 @@ #include "mongo/db/operation_context.h" namespace mongo { - namespace { -class LogicalSessionSSS : public ServerStatusSection { +class LogicalSessionServerStatusSection : public ServerStatusSection { public: - LogicalSessionSSS() : ServerStatusSection("logicalSessionRecordCache") {} - - ~LogicalSessionSSS() override = default; + LogicalSessionServerStatusSection() : ServerStatusSection("logicalSessionRecordCache") {} bool includeByDefault() const override { return true; @@ -50,11 +47,12 @@ public: BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const override { - auto lsCache = LogicalSessionCache::get(opCtx); - return lsCache ? lsCache->getStats().toBSON() : BSONObj(); + const auto logicalSessionCache = LogicalSessionCache::get(opCtx); + + return logicalSessionCache ? logicalSessionCache->getStats().toBSON() : BSONObj(); } -} LogicalSessionSSS; +} logicalSessionsServerStatusSection; } // namespace } // namespace mongo diff --git a/src/mongo/db/commands/reap_logical_session_cache_now.cpp b/src/mongo/db/commands/reap_logical_session_cache_now.cpp index 8379169e84e..e60e0fb28fd 100644 --- a/src/mongo/db/commands/reap_logical_session_cache_now.cpp +++ b/src/mongo/db/commands/reap_logical_session_cache_now.cpp @@ -37,13 +37,9 @@ #include "mongo/db/operation_context.h" namespace mongo { - namespace { class ReapLogicalSessionCacheNowCommand final : public BasicCommand { - ReapLogicalSessionCacheNowCommand(const ReapLogicalSessionCacheNowCommand&) = delete; - ReapLogicalSessionCacheNowCommand& operator=(const ReapLogicalSessionCacheNowCommand&) = delete; - public: ReapLogicalSessionCacheNowCommand() : BasicCommand("reapLogicalSessionCacheNow") {} @@ -70,10 +66,10 @@ public: return Status::OK(); } - virtual bool run(OperationContext* opCtx, - const std::string& db, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { + bool run(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { auto cache = LogicalSessionCache::get(opCtx); auto client = opCtx->getClient(); @@ -87,5 +83,4 @@ public: MONGO_REGISTER_TEST_COMMAND(ReapLogicalSessionCacheNowCommand); } // namespace - } // namespace mongo diff --git a/src/mongo/db/commands/refresh_logical_session_cache_now.cpp b/src/mongo/db/commands/refresh_logical_session_cache_now.cpp index 8d09a15f17a..b6fb0cb4fd4 100644 --- a/src/mongo/db/commands/refresh_logical_session_cache_now.cpp +++ b/src/mongo/db/commands/refresh_logical_session_cache_now.cpp @@ -38,14 +38,9 @@ #include "mongo/db/operation_context.h" namespace mongo { - namespace { class RefreshLogicalSessionCacheNowCommand final : public BasicCommand { - RefreshLogicalSessionCacheNowCommand(const RefreshLogicalSessionCacheNowCommand&) = delete; - RefreshLogicalSessionCacheNowCommand& operator=(const RefreshLogicalSessionCacheNowCommand&) = - delete; - public: RefreshLogicalSessionCacheNowCommand() : BasicCommand("refreshLogicalSessionCacheNow") {} @@ -76,10 +71,10 @@ public: return Status::OK(); } - virtual bool run(OperationContext* opCtx, - const std::string& db, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { + bool run(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { auto cache = LogicalSessionCache::get(opCtx); auto client = opCtx->getClient(); @@ -95,5 +90,4 @@ public: MONGO_REGISTER_TEST_COMMAND(RefreshLogicalSessionCacheNowCommand); } // namespace - } // namespace mongo diff --git a/src/mongo/db/commands/refresh_sessions_command.cpp b/src/mongo/db/commands/refresh_sessions_command.cpp index 229db890078..8e2c663e6cc 100644 --- a/src/mongo/db/commands/refresh_sessions_command.cpp +++ b/src/mongo/db/commands/refresh_sessions_command.cpp @@ -40,26 +40,28 @@ #include "mongo/db/refresh_sessions_gen.h" namespace mongo { +namespace { class RefreshSessionsCommand final : public BasicCommand { - RefreshSessionsCommand(const RefreshSessionsCommand&) = delete; - RefreshSessionsCommand& operator=(const RefreshSessionsCommand&) = delete; - public: RefreshSessionsCommand() : BasicCommand("refreshSessions") {} AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kAlways; } + bool adminOnly() const override { return false; } + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } + std::string help() const override { return "renew a set of logical sessions"; } + Status checkAuthForOperation(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj) const override { @@ -75,18 +77,18 @@ public: } } - virtual bool run(OperationContext* opCtx, - const std::string& db, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { + bool run(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { IDLParserErrorContext ctx("RefreshSessionsCmdFromClient"); auto cmd = RefreshSessionsCmdFromClient::parse(ctx, cmdObj); - auto res = - LogicalSessionCache::get(opCtx->getServiceContext())->refreshSessions(opCtx, cmd); - uassertStatusOK(res); + uassertStatusOK(LogicalSessionCache::get(opCtx)->refreshSessions(opCtx, cmd)); return true; } + } refreshSessionsCommand; +} // namespace } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index cf14e81a6aa..a38d510fd5a 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -359,18 +359,20 @@ private: class CmdReplSetReconfig : public ReplSetCommand { public: + CmdReplSetReconfig() : ReplSetCommand("replSetReconfig") {} + std::string help() const override { return "Adjust configuration of a replica set\n" "{ replSetReconfig : config_object }\n" "http://dochub.mongodb.org/core/replicasetcommands"; } - CmdReplSetReconfig() : ReplSetCommand("replSetReconfig") {} - virtual bool run(OperationContext* opCtx, - const string&, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); - uassertStatusOK(status); + + bool run(OperationContext* opCtx, + const string&, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + const auto replCoord = ReplicationCoordinator::get(opCtx); + uassertStatusOK(replCoord->checkReplEnabledForCommand(&result)); if (cmdObj["replSetReconfig"].type() != Object) { result.append("errmsg", "no configuration specified"); @@ -380,23 +382,22 @@ public: ReplicationCoordinator::ReplSetReconfigArgs parsedArgs; parsedArgs.newConfigObj = cmdObj["replSetReconfig"].Obj(); parsedArgs.force = cmdObj.hasField("force") && cmdObj["force"].trueValue(); - status = - ReplicationCoordinator::get(opCtx)->processReplSetReconfig(opCtx, parsedArgs, &result); - - Lock::GlobalLock globalLock(opCtx, MODE_IX); + auto status = replCoord->processReplSetReconfig(opCtx, parsedArgs, &result); - WriteUnitOfWork wuow(opCtx); if (status.isOK() && !parsedArgs.force) { + const auto service = opCtx->getServiceContext(); + + Lock::GlobalLock globalLock(opCtx, MODE_IX); + WriteUnitOfWork wuow(opCtx); // Users must not be allowed to provide their own contents for the o2 field. // o2 field of no-ops is supposed to be used internally. - getGlobalServiceContext()->getOpObserver()->onOpMessage( - opCtx, - BSON("msg" - << "Reconfig set" - << "version" - << parsedArgs.newConfigObj["version"])); + service->getOpObserver()->onOpMessage(opCtx, + BSON("msg" + << "Reconfig set" + << "version" + << parsedArgs.newConfigObj["version"])); + wuow.commit(); } - wuow.commit(); uassertStatusOK(status); return true; diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index d803e0987a3..2e93ca54435 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -169,7 +169,9 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size() << " transactions"; - auto clockSource = opCtx->getServiceContext()->getFastClockSource(); + const auto service = opCtx->getServiceContext(); + const auto clockSource = service->getFastClockSource(); + auto& catalog = catalogAndScheduler->catalog; auto& scheduler = catalogAndScheduler->scheduler; @@ -180,7 +182,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, const auto txnNumber = *doc.getId().getTxnNumber(); auto coordinator = std::make_shared<TransactionCoordinator>( - opCtx->getServiceContext(), + service, lsid, txnNumber, scheduler.makeChildScheduler(), @@ -191,9 +193,6 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, } }) .tapAll([catalogAndScheduler = _catalogAndScheduler](Status status) { - // TODO (SERVER-38320): Reschedule the step-up task if the interruption was not due - // to stepdown. - auto& catalog = catalogAndScheduler->catalog; catalog.exitStepUp(status); }); diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 635f3fc7167..b26eb7cdb19 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -72,6 +72,9 @@ private: // practice, it is only used inside of the SessionCatalog itself. OperationContext* _checkoutOpCtx{nullptr}; + // Keeps the last time this session was checked-out + Date_t _lastCheckout{Date_t::now()}; + // Counter indicating the number of times ObservableSession::kill has been called on this // session, which have not yet had a corresponding call to checkOutSessionForKill. int _killsRequested{0}; diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 2e0d6de0953..db574180f6f 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -33,7 +33,6 @@ #include "mongo/db/session_catalog.h" -#include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -84,12 +83,16 @@ SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(Operati // Wait until the session is no longer checked out and until the previously scheduled kill has // completed + ++sri->numWaitingToCheckOut; + ON_BLOCK_EXIT([&] { --sri->numWaitingToCheckOut; }); + opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri]() { ObservableSession osession(ul, sri->session); return !osession.currentOperation() && !osession._killed(); }); sri->session._checkoutOpCtx = opCtx; + sri->session._lastCheckout = Date_t::now(); return ScopedCheckedOutSession( *this, std::move(sri), boost::none /* Not checked out for kill */); @@ -107,33 +110,62 @@ SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationCo invariant(ObservableSession(ul, sri->session)._killed()); // Wait until the session is no longer checked out + ++sri->numWaitingToCheckOut; + ON_BLOCK_EXIT([&] { --sri->numWaitingToCheckOut; }); + opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri] { ObservableSession osession(ul, sri->session); return !osession.currentOperation(); }); sri->session._checkoutOpCtx = opCtx; + sri->session._lastCheckout = Date_t::now(); return SessionToKill(ScopedCheckedOutSession(*this, std::move(sri), std::move(killToken))); } void SessionCatalog::scanSession(const LogicalSessionId& lsid, const ScanSessionsCallbackFn& workerFn) { - stdx::lock_guard<stdx::mutex> lg(_mutex); - auto it = _sessions.find(lsid); - if (it != _sessions.end()) - workerFn({lg, it->second->session}); + std::unique_ptr<SessionRuntimeInfo> sessionToReap; + + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + auto it = _sessions.find(lsid); + if (it != _sessions.end()) { + auto& sri = it->second; + ObservableSession osession(lg, sri->session); + workerFn(osession); + + if (osession._markedForReap && !osession._killed() && !osession.currentOperation() && + !sri->numWaitingToCheckOut) { + sessionToReap = std::move(sri); + _sessions.erase(it); + } + } + } } void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, const ScanSessionsCallbackFn& workerFn) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + std::vector<std::unique_ptr<SessionRuntimeInfo>> sessionsToReap; + + { + stdx::lock_guard<stdx::mutex> lg(_mutex); - LOG(2) << "Beginning scanSessions. Scanning " << _sessions.size() << " sessions."; + LOG(2) << "Beginning scanSessions. Scanning " << _sessions.size() << " sessions."; - for (auto& sessionEntry : _sessions) { - if (matcher.match(sessionEntry.first)) { - workerFn({lg, sessionEntry.second->session}); + for (auto it = _sessions.begin(); it != _sessions.end(); ++it) { + if (matcher.match(it->first)) { + auto& sri = it->second; + ObservableSession osession(lg, sri->session); + workerFn(osession); + + if (osession._markedForReap && !osession._killed() && + !osession.currentOperation() && !sri->numWaitingToCheckOut) { + sessionsToReap.emplace_back(std::move(sri)); + _sessions.erase(it++); + } + } } } } @@ -147,6 +179,11 @@ SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& ls return ObservableSession(lg, sri->session).kill(); } +size_t SessionCatalog::size() const { + stdx::lock_guard<stdx::mutex> lg(_mutex); + return _sessions.size(); +} + SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeInfo( WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) { auto it = _sessions.find(lsid); @@ -174,6 +211,10 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, } } +SessionCatalog::SessionRuntimeInfo::~SessionRuntimeInfo() { + invariant(!numWaitingToCheckOut); +} + SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) const { const bool firstKiller = (0 == _session->_killsRequested); ++_session->_killsRequested; @@ -189,6 +230,10 @@ SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) cons return SessionCatalog::KillToken(getSessionId()); } +void ObservableSession::markForReap() { + _markedForReap = true; +} + bool ObservableSession::_killed() const { return _session->_killsRequested > 0; } diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index c88ed62eb96..d0f97902461 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -98,7 +98,7 @@ public: * Iterates through the SessionCatalog and applies 'workerFn' to each Session. This locks the * SessionCatalog. */ - using ScanSessionsCallbackFn = stdx::function<void(const ObservableSession&)>; + using ScanSessionsCallbackFn = stdx::function<void(ObservableSession&)>; void scanSession(const LogicalSessionId& lsid, const ScanSessionsCallbackFn& workerFn); void scanSessions(const SessionKiller::Matcher& matcher, const ScanSessionsCallbackFn& workerFn); @@ -109,18 +109,30 @@ public: */ KillToken killSession(const LogicalSessionId& lsid); + /** + * Returns the total number of entries currently cached on the session catalog. + */ + size_t size() const; + private: struct SessionRuntimeInfo { SessionRuntimeInfo(LogicalSessionId lsid) : session(std::move(lsid)) {} + ~SessionRuntimeInfo(); // Must only be accessed when the state is kInUse and only by the operation context, which // currently has it checked out Session session; + // Counts how many threads have called checkOutSession/checkOutSessionForKill and are + // blocked in it waiting for the session to become available. Used to block reaping of + // sessions entries from the map. + int numWaitingToCheckOut{0}; + // Signaled when the state becomes available. Uses the transaction table's mutex to protect // the state transitions. stdx::condition_variable availableCondVar; }; + using SessionRuntimeInfoMap = LogicalSessionIdMap<std::unique_ptr<SessionRuntimeInfo>>; /** * Blocking method, which checks-out the session set on 'opCtx'. @@ -141,10 +153,10 @@ private: void _releaseSession(SessionRuntimeInfo* sri, boost::optional<KillToken> killToken); // Protects the state below - stdx::mutex _mutex; + mutable stdx::mutex _mutex; // Owns the Session objects for all current Sessions. - LogicalSessionIdMap<std::unique_ptr<SessionRuntimeInfo>> _sessions; + SessionRuntimeInfoMap _sessions; }; /** @@ -205,10 +217,10 @@ private: class SessionCatalog::SessionToKill { public: SessionToKill(ScopedCheckedOutSession&& scos) : _scos(std::move(scos)) {} + Session* get() const { return _scos.get(); } - const LogicalSessionId& getSessionId() const { return get()->getSessionId(); } @@ -250,6 +262,13 @@ public: } /** + * Returns when is the last time this session was checked-out, for reaping purposes. + */ + Date_t getLastCheckout() const { + return _session->_lastCheckout; + } + + /** * Increments the number of "killers" for this session and returns a 'kill token' to to be * passed later on to 'checkOutSessionForKill' method of the SessionCatalog in order to permit * the caller to execute any kill cleanup tasks. This token is later used to decrement the @@ -265,6 +284,16 @@ public: SessionCatalog::KillToken kill(ErrorCodes::Error reason = ErrorCodes::Interrupted) const; /** + * Indicates to the SessionCatalog that the session tracked by this object is safe to be deleted + * from the map. It is up to the caller to provide the necessary checks that all the decorations + * they are using are prepared to be destroyed. + * + * Calling this method does not guarantee that the session will in fact be destroyed, which + * could happen if there are threads waiting for it to be checked-out. + */ + void markForReap(); + + /** * Returns a pointer to the Session itself. */ Session* get() const { @@ -291,6 +320,7 @@ private: Session* _session; stdx::unique_lock<Client> _clientLock; + bool _markedForReap{false}; }; /** diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 0611994c6be..518b871f3fc 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -141,6 +141,7 @@ int removeSessionsTransactionRecords(OperationContext* opCtx, if (expiredSessionIds.empty()) return 0; + // Remove the session ids from the on-disk catalog write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace); deleteOp.setWriteCommandBase([] { write_ops::WriteCommandBase base; @@ -343,6 +344,42 @@ void MongoDSessionCatalog::invalidateAllSessions(OperationContext* opCtx) { int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx, SessionsCollection& sessionsCollection, Date_t possiblyExpired) { + { + const auto catalog = SessionCatalog::get(opCtx); + + // Capture the possbily expired in-memory session ids + LogicalSessionIdSet lsids; + catalog->scanSessions(SessionKiller::Matcher( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}), + [&](const ObservableSession& session) { + if (session.getLastCheckout() < possiblyExpired) { + lsids.insert(session.getSessionId()); + } + }); + + // From the passed-in sessions, find the ones which are actually expired/removed + auto expiredSessionIds = + uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, lsids)); + + // Remove the session ids from the in-memory catalog + for (const auto& lsid : expiredSessionIds) { + catalog->scanSession(lsid, [](ObservableSession& session) { + const auto participant = TransactionParticipant::get(session); + if (!participant.inMultiDocumentTransaction()) { + session.markForReap(); + } + }); + } + } + + // The "unsafe" check for primary below is a best-effort attempt to ensure that the on-disk + // state reaping code doesn't run if the node is secondary and cause log spam. It is a work + // around the fact that the logical sessions cache is not registered to listen for replication + // state changes. + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, NamespaceString::kConfigDb)) + return 0; + // Scan for records older than the minimum lifetime and uses a sort to walk the '_id' index DBDirectClient client(opCtx); auto cursor = diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h index b55f8927f3f..591d234029b 100644 --- a/src/mongo/db/session_catalog_mongod.h +++ b/src/mongo/db/session_catalog_mongod.h @@ -69,6 +69,8 @@ public: /** * Locates session entries from the in-memory catalog and in 'config.transactions' which have * not been referenced before 'possiblyExpired' and deletes them. + * + * Returns the number of sessions, which were reaped from the persisted store on disk. */ static int reapSessionsOlderThan(OperationContext* opCtx, SessionsCollection& sessionsCollection, diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index 3bd63277365..da7712f89d4 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -117,6 +117,67 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) { ASSERT(!OperationContextSession::get(_opCtx)); } +TEST_F(SessionCatalogTest, ScanSession) { + // Create three sessions in the catalog. + const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest()}; + for (const auto& lsid : lsids) { + stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + }).get(); + } + + catalog()->scanSession(lsids[0], [&lsids](const ObservableSession& session) { + ASSERT_EQ(lsids[0], session.get()->getSessionId()); + }); + + catalog()->scanSession(lsids[1], [&lsids](const ObservableSession& session) { + ASSERT_EQ(lsids[1], session.get()->getSessionId()); + }); + + catalog()->scanSession(lsids[2], [&lsids](const ObservableSession& session) { + ASSERT_EQ(lsids[2], session.get()->getSessionId()); + }); + + catalog()->scanSession(makeLogicalSessionIdForTest(), [](const ObservableSession&) { + FAIL("The callback was called for non-existent session"); + }); +} + +TEST_F(SessionCatalogTest, ScanSessionMarkForReapWhenSessionIsIdle) { + // Create three sessions in the catalog. + const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest()}; + for (const auto& lsid : lsids) { + stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + }).get(); + } + + catalog()->scanSession(lsids[0], + [&lsids](ObservableSession& session) { session.markForReap(); }); + + catalog()->scanSession(lsids[0], [](const ObservableSession&) { + FAIL("The callback was called for non-existent session"); + }); + + catalog()->scanSession(lsids[1], [&lsids](const ObservableSession& session) { + ASSERT_EQ(lsids[1], session.get()->getSessionId()); + }); + + catalog()->scanSession(lsids[2], [&lsids](const ObservableSession& session) { + ASSERT_EQ(lsids[2], session.get()->getSessionId()); + }); +} + TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { std::vector<LogicalSessionId> lsidsFound; const auto workerFn = [&lsidsFound](const ObservableSession& session) { @@ -157,6 +218,47 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { lsidsFound.clear(); } +TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsMarkForReap) { + // Create three sessions in the catalog. + const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest()}; + + unittest::Barrier sessionsCheckedOut(2); + unittest::Barrier sessionsCheckedIn(2); + + auto f = stdx::async(stdx::launch::async, [&] { + ThreadClient tc(getServiceContext()); + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsids[1]); + OperationContextSession ocs(opCtx.get()); + sessionsCheckedOut.countDownAndWait(); + sessionsCheckedIn.countDownAndWait(); + }); + + // After this wait, session 1 is checked-out and waiting on the barrier, because of which only + // sessions 0 and 2 will be reaped + sessionsCheckedOut.countDownAndWait(); + + SessionKiller::Matcher matcherAllSessions( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}); + + catalog()->scanSessions(matcherAllSessions, + [&](ObservableSession& session) { session.markForReap(); }); + + catalog()->scanSessions(matcherAllSessions, [&](const ObservableSession& session) { + ASSERT_EQ(lsids[1], session.get()->getSessionId()); + }); + + // After this point, session 1 is checked back in + sessionsCheckedIn.countDownAndWait(); + f.get(); + + catalog()->scanSessions(matcherAllSessions, [&](const ObservableSession& session) { + ASSERT_EQ(lsids[1], session.get()->getSessionId()); + }); +} + TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) { const auto lsid = makeLogicalSessionIdForTest(); diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index 7ffc762201e..6421302ab28 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -46,9 +46,6 @@ #include "mongo/stdx/memory.h" namespace mongo { - -constexpr StringData SessionsCollection::kSessionsTTLIndex; - namespace { // This batch size is chosen to ensure that we don't form requests larger than the 16mb limit. @@ -161,6 +158,10 @@ Status runBulkCmd(StringData label, } // namespace +constexpr StringData SessionsCollection::kSessionsTTLIndex; + +SessionsCollection::SessionsCollection() = default; + SessionsCollection::~SessionsCollection() = default; SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite( diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h index 64e78c8a476..91ae86e7c9f 100644 --- a/src/mongo/db/sessions_collection.h +++ b/src/mongo/db/sessions_collection.h @@ -34,8 +34,6 @@ namespace mongo { -class BSONArrayBuilder; -class BSONObjBuilder; class DBClientBase; class OperationContext; @@ -46,7 +44,6 @@ class OperationContext; * implement their own classes that fulfill this interface. */ class SessionsCollection { - public: static constexpr StringData kSessionsTTLIndex = "lsidTTLIndex"_sd; @@ -98,6 +95,8 @@ public: static BSONObj generateCollModCmd(); protected: + SessionsCollection(); + /** * Makes a send function for the given client. */ diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 3ba34fd40b3..548453ab45a 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -291,7 +291,9 @@ const BSONObj TransactionParticipant::kDeadEndSentinel(BSON("$incompleteOplogHis TransactionParticipant::TransactionParticipant() = default; -TransactionParticipant::~TransactionParticipant() = default; +TransactionParticipant::~TransactionParticipant() { + // invariant(!_o.txnState.isInProgress()); +} TransactionParticipant::Observer::Observer(const ObservableSession& osession) : Observer(&getTransactionParticipant(osession.get())) {} @@ -2027,20 +2029,18 @@ void TransactionParticipant::Participant::invalidate(OperationContext* opCtx) { void TransactionParticipant::Participant::abortPreparedTransactionForRollback( OperationContext* opCtx) { - stdx::lock_guard<Client> lg(*opCtx->getClient()); - - // Invalidate the session. - _invalidate(lg); - uassert(51030, str::stream() << "Cannot call abortPreparedTransactionForRollback on unprepared " << "transaction.", o().txnState.isPrepared()); + stdx::lock_guard<Client> lg(*opCtx->getClient()); + // It should be safe to clear transactionOperationBytes and transactionOperations because // we only modify these variables when adding an operation to a transaction. Since this // transaction is already prepared, we cannot add more operations to it. We will have this // in the prepare oplog entry. + _invalidate(lg); _resetTransactionState(lg, TransactionState::kNone); } |