diff options
author | Spencer T Brody <spencer@mongodb.com> | 2018-09-13 13:31:02 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2018-09-17 13:58:39 -0400 |
commit | c6d90316d6b694e12426274c713a4a078e004fc5 (patch) | |
tree | d5dc831d02fe2f0b7249488fa74d69071a4da4f9 | |
parent | 9cb3cdea9cd05686abc2300e7f18bd2a51437240 (diff) | |
download | mongo-c6d90316d6b694e12426274c713a4a078e004fc5.tar.gz |
SERVER-35870 Add functionality to prevent Session checkouts & wait for all Sessions to be checked in
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.h | 37 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_test.cpp | 69 |
3 files changed, 144 insertions, 0 deletions
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 2c08a5f0083..bbc201c12c3 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -128,6 +128,10 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) stdx::unique_lock<stdx::mutex> ul(_mutex); + while (!_allowCheckingOutSessions) { + opCtx->waitForConditionOrInterrupt(_checkingOutSessionsAllowedCond, ul); + } + auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid); // Wait until the session is no longer checked out @@ -136,6 +140,7 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) invariant(!sri->checkedOut); sri->checkedOut = true; + ++_numCheckedOutSessions; return ScopedCheckedOutSession(opCtx, ScopedSession(std::move(sri))); } @@ -215,6 +220,7 @@ void SessionCatalog::scanSessions(OperationContext* opCtx, std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreateSessionRuntimeInfo( WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + invariant(_allowCheckingOutSessions); auto it = _sessions.find(lsid); if (it == _sessions.end()) { @@ -235,6 +241,38 @@ void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) { sri->checkedOut = false; sri->availableCondVar.notify_one(); + --_numCheckedOutSessions; + if (_numCheckedOutSessions == 0) { + _allSessionsCheckedInCond.notify_all(); + } +} + +SessionCatalog::PreventCheckingOutSessionsBlock::PreventCheckingOutSessionsBlock( + SessionCatalog* sessionCatalog) + : _sessionCatalog(sessionCatalog) { + invariant(sessionCatalog); + + stdx::lock_guard<stdx::mutex> lg(sessionCatalog->_mutex); + invariant(sessionCatalog->_allowCheckingOutSessions); + sessionCatalog->_allowCheckingOutSessions = false; +} + +SessionCatalog::PreventCheckingOutSessionsBlock::~PreventCheckingOutSessionsBlock() { + stdx::lock_guard<stdx::mutex> lg(_sessionCatalog->_mutex); + + invariant(!_sessionCatalog->_allowCheckingOutSessions); + _sessionCatalog->_allowCheckingOutSessions = true; + _sessionCatalog->_checkingOutSessionsAllowedCond.notify_all(); +} + +void SessionCatalog::PreventCheckingOutSessionsBlock::waitForAllSessionsToBeCheckedIn( + OperationContext* opCtx) { + stdx::unique_lock<stdx::mutex> ul(_sessionCatalog->_mutex); + + invariant(!_sessionCatalog->_allowCheckingOutSessions); + while (_sessionCatalog->_numCheckedOutSessions > 0) { + opCtx->waitForConditionOrInterrupt(_sessionCatalog->_allSessionsCheckedInCond, ul); + } } OperationContextSession::OperationContextSession(OperationContext* opCtx, bool checkOutSession) diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index fb7531d11d3..e4ca4dda0d6 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -53,6 +53,8 @@ class SessionCatalog { friend class ScopedCheckedOutSession; public: + class PreventCheckingOutSessionsBlock; + SessionCatalog() = default; ~SessionCatalog(); @@ -161,8 +163,43 @@ private: */ void _releaseSession(const LogicalSessionId& lsid); + // Protects members below. stdx::mutex _mutex; + + // Owns the Session objects for all current Sessions. SessionRuntimeInfoMap _sessions; + + // Count of the number of Sessions that are currently checked out. + uint32_t _numCheckedOutSessions{0}; + + // Set to false to cause all Session checkout or creation requests to block. + bool _allowCheckingOutSessions{true}; + + // Condition that is signaled when the number of checked out sessions goes to 0. + stdx::condition_variable _allSessionsCheckedInCond; + + // Condition that is signaled when checking out Sessions becomes legal again after having + // previously been forbidden. + stdx::condition_variable _checkingOutSessionsAllowedCond; +}; + +/** + * While this object is in scope, all requests to check out a Session will block. + */ +class SessionCatalog::PreventCheckingOutSessionsBlock { + MONGO_DISALLOW_COPYING(PreventCheckingOutSessionsBlock); + +public: + explicit PreventCheckingOutSessionsBlock(SessionCatalog* sessionCatalog); + ~PreventCheckingOutSessionsBlock(); + + /** + * Waits until there are no Sessions checked out in the SessionCatalog. + */ + void waitForAllSessionsToBeCheckedIn(OperationContext* opCtx); + +private: + SessionCatalog* _sessionCatalog{nullptr}; }; /** diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index c55c2c780db..e0343f8672a 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -207,5 +207,74 @@ TEST_F(SessionCatalogTest, ScanSessions) { ASSERT_EQ(lsids.front(), lsid2); } +TEST_F(SessionCatalogTest, PreventCheckout) { + const auto lsid = makeLogicalSessionIdForTest(); + opCtx()->setLogicalSessionId(lsid); + opCtx()->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + + { + SessionCatalog::PreventCheckingOutSessionsBlock preventCheckoutBlock(catalog()); + + ASSERT_THROWS_CODE( + catalog()->checkOutSession(opCtx()), AssertionException, ErrorCodes::MaxTimeMSExpired); + } + + auto scopedSession = catalog()->checkOutSession(opCtx()); + ASSERT(scopedSession.get()); + ASSERT_EQ(lsid, scopedSession->getSessionId()); +} + +TEST_F(SessionCatalogTest, WaitForAllSessions) { + const auto lsid1 = makeLogicalSessionIdForTest(); + const auto lsid2 = makeLogicalSessionIdForTest(); + opCtx()->setLogicalSessionId(lsid1); + + // Check out a Session. + boost::optional<OperationContextSession> ocs; + ocs.emplace(opCtx(), true); + ASSERT_EQ(lsid1, ocs->get(opCtx())->getSessionId()); + + // Prevent new Sessions from being checked out. + boost::optional<SessionCatalog::PreventCheckingOutSessionsBlock> preventCheckoutBlock; + preventCheckoutBlock.emplace(catalog()); + + // Enqueue a request to check out a Session. + auto future = stdx::async(stdx::launch::async, [&] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready(); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid2); + auto asyncScopedSession = + SessionCatalog::get(sideOpCtx.get())->checkOutSession(sideOpCtx.get()); + + ASSERT(asyncScopedSession.get()); + ASSERT_EQ(lsid2, asyncScopedSession->getSessionId()); + }); + + // Ensure that waitForAllSessionsToBeCheckedIn() times out since we are holding a Session + // checked out. + opCtx()->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(preventCheckoutBlock->waitForAllSessionsToBeCheckedIn(opCtx()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + // Release the Session we have checked out. + ocs.reset(); + + // Now ensure that waitForAllSessionsToBeCheckedIn() can complete. + preventCheckoutBlock->waitForAllSessionsToBeCheckedIn(opCtx()); + + // Ensure that the async thread trying to check out a Session is still blocked. + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + // Allow checking out Sessions to proceed. + preventCheckoutBlock.reset(); + + // Ensure that the async thread can now proceed and successfully check out a Session. + future.get(); +} + } // namespace } // namespace mongo |