diff options
author | George Wangensteen <george.wangensteen@mongodb.com> | 2021-10-13 19:38:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-16 17:51:42 +0000 |
commit | ca1a96490687e5bff52c35bb5eec8408e0912291 (patch) | |
tree | 49d69e9c356e4a9b65920afb3b8568e02f0c72fa | |
parent | 0a38f458292dc64d5be84663bec8102d9a232f1e (diff) | |
download | mongo-ca1a96490687e5bff52c35bb5eec8408e0912291.tar.gz |
SERVER-58503 Kill open cursors for a connection when a load balanced connection on mongos is closed
(cherry picked from commit b429d5dda98bbe18ab0851ffd1729d3b57fc8a4e)
-rw-r--r-- | jstests/sharding/load_balancer_support/disconnect_kills_cursors.js | 97 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/load_balancer_support.cpp | 20 | ||||
-rw-r--r-- | src/mongo/s/load_balancer_support.h | 7 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.cpp | 51 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.h | 292 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager_test.cpp | 424 | ||||
-rw-r--r-- | src/mongo/s/service_entry_point_mongos.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/service_entry_point_mongos.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point.h | 8 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 3 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine_test.cpp | 26 |
12 files changed, 611 insertions, 333 deletions
diff --git a/jstests/sharding/load_balancer_support/disconnect_kills_cursors.js b/jstests/sharding/load_balancer_support/disconnect_kills_cursors.js new file mode 100644 index 00000000000..133f6bad748 --- /dev/null +++ b/jstests/sharding/load_balancer_support/disconnect_kills_cursors.js @@ -0,0 +1,97 @@ +/** + * @tags: [featureFlagLoadBalancer] + * + * Tests that when a load-balanced client disconnects, its cursors are killed. + */ + +(() => { + "use strict"; + load("jstests/libs/parallelTester.js"); + + function setupShardedCollection(st, dbName, collName) { + const fullNss = dbName + "." + collName; + const admin = st.s.getDB("admin"); + // Shard collection; ensure docs on each shard + assert.commandWorked(admin.runCommand({enableSharding: dbName})); + assert.commandWorked(admin.runCommand({movePrimary: dbName, to: st.shard0.shardName})); + assert.commandWorked(admin.runCommand({shardCollection: fullNss, key: {_id: 1}})); + assert.commandWorked(admin.runCommand({split: fullNss, middle: {_id: 0}})); + assert.commandWorked( + admin.runCommand({moveChunk: fullNss, find: {_id: 0}, to: st.shard1.shardName})); + + // Insert some docs on each shard + let coll = admin.getSiblingDB(dbName).getCollection(collName); + var bulk = coll.initializeUnorderedBulkOp(); + for (let i = -150; i < 150; i++) { + bulk.insert({_id: i}); + } + assert.commandWorked(bulk.execute()); + } + + function openCursor(mongosHost, dbName, collName, countdownLatch, identifyingComment) { + const newDBConn = new Mongo(mongosHost).getDB(dbName); + assert.commandWorked(newDBConn.getSiblingDB("admin").adminCommand( + {configureFailPoint: "clientIsFromLoadBalancer", mode: "alwaysOn"})); + let cmdRes = + newDBConn.runCommand({find: collName, comment: identifyingComment, batchSize: 1}); + assert.commandWorked(cmdRes); + const cursorId = cmdRes.cursor.id; + assert.neq(cursorId, NumberLong(0)); + // Wait until countdownLatch value is 0. + countdownLatch.await(); + return cursorId; + } + + const testName = "load_balanced_disconnect_kills_cursors"; + let st = new ShardingTest({shards: 2, mongos: 1}); + const dbName = "foo"; + const collName = "bar"; + const admin = st.s.getDB("admin"); + const identifyingComment = "loadBalancedDisconnectComment"; + + setupShardedCollection(st, dbName, collName); + let countdownLatch = new CountDownLatch(1); + + let cursorOpeningThread = + new Thread(openCursor, st.s.host, dbName, collName, countdownLatch, identifyingComment); + cursorOpeningThread.start(); + + let idleCursor = {}; + + // Wait until we can see the cursor opened by cursorOpeningThread, identified by the comment, as + // idle. + assert.soon(() => { + const curopCursor = admin.aggregate([ + {$currentOp: {allUsers: true, idleCursors: true, localOps: true}}, + {$match: {type: "idleCursor"}}, + {$match: {"cursor.originatingCommand.comment": identifyingComment}} + ]); + if (curopCursor.hasNext()) { + idleCursor = curopCursor.next().cursor; + return true; + } + return false; + }, "Couldn't find cursor opened by cursorOpeningThread"); + + // We've found the cursor opened by cursorOpeningThread. + // We now join that thread, and therefore end its connection to the server. + countdownLatch.countDown(); + cursorOpeningThread.join(); + assert.commandWorked( + admin.adminCommand({configureFailPoint: "clientIsFromLoadBalancer", mode: "off"})); + + let cursorId = cursorOpeningThread.returnData(); + assert.eq(idleCursor.cursorId, cursorId); + + // Make sure we can't find that cursor anymore/it has been killed. + const numCursorsFoundWithId = + admin + .aggregate([ + {$currentOp: {allUsers: true, idleCursors: true, localOps: true}}, + {$match: {type: "idleCursor"}}, + {$match: {"cursor.cursorId": cursorId}} + ]) + .itcount(); + assert.eq(numCursorsFoundWithId, NumberLong(0)); + st.stop(); +})(); diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 98e81ffb1be..4eeb40e6e38 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -554,6 +554,7 @@ env.Library( 'cluster_last_error_info', 'commands/cluster_commands', 'committed_optime_metadata_hook', + 'load_balancer_support', 'mongos_initializers', 'mongos_topology_coordinator', 'query/cluster_cursor_cleanup_job', diff --git a/src/mongo/s/load_balancer_support.cpp b/src/mongo/s/load_balancer_support.cpp index b1ebf6bb6d3..1f2e1dcaf9d 100644 --- a/src/mongo/s/load_balancer_support.cpp +++ b/src/mongo/s/load_balancer_support.cpp @@ -45,11 +45,6 @@ namespace { MONGO_FAIL_POINT_DEFINE(clientIsFromLoadBalancer); -bool featureEnabled() { - return feature_flags::gFeatureFlagLoadBalancer.isEnabled( - serverGlobalParams.featureCompatibility); -} - struct PerService { /** * When a client reaches a mongos through a load balancer, the `serviceId` @@ -92,15 +87,20 @@ const auto getPerServiceState = ServiceContext::declareDecoration<PerService>(); const auto getPerClientState = Client::declareDecoration<PerClient>(); } // namespace +bool isEnabled() { + return feature_flags::gFeatureFlagLoadBalancer.isEnabled( + serverGlobalParams.featureCompatibility); +} + void setClientIsFromLoadBalancer(Client* client) { - if (!featureEnabled()) + if (!isEnabled()) return; auto& perClient = getPerClientState(client); perClient.setIsFromLoadBalancer(); } void handleHello(OperationContext* opCtx, BSONObjBuilder* result, bool helloHasLoadBalancedOption) { - if (!featureEnabled()) + if (!isEnabled()) return; auto& perClient = getPerClientState(opCtx->getClient()); if (perClient.didHello() || !perClient.isFromLoadBalancer()) @@ -115,4 +115,10 @@ void handleHello(OperationContext* opCtx, BSONObjBuilder* result, bool helloHasL perClient.setDidHello(); } +bool isFromLoadBalancer(Client* client) { + if (!isEnabled()) { + return false; + } + return getPerClientState(client).isFromLoadBalancer(); +} } // namespace mongo::load_balancer_support diff --git a/src/mongo/s/load_balancer_support.h b/src/mongo/s/load_balancer_support.h index c3d46d65823..1000a2f32f7 100644 --- a/src/mongo/s/load_balancer_support.h +++ b/src/mongo/s/load_balancer_support.h @@ -54,4 +54,11 @@ void setClientIsFromLoadBalancer(Client* client); */ void handleHello(OperationContext* opCtx, BSONObjBuilder* result, bool helloHasLoadBalancedOption); +bool isFromLoadBalancer(Client* client); + +/** + * Returns whether the feature flag for load balancer support is enabled. + */ +bool isEnabled(); + } // namespace mongo::load_balancer_support diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index be5f905ff35..668795d20fd 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -234,6 +234,7 @@ StatusWith<CursorId> ClusterCursorManager::registerCursor( cursorLifetime, now, authenticatedUsers, + opCtx->getClient()->getUUID(), opCtx->getOperationKey())); invariant(emplaceResult.second); _log.push({LogEvent::Type::kRegisterComplete, cursorId, now, nss}); @@ -408,46 +409,36 @@ void ClusterCursorManager::detachAndKillCursor(stdx::unique_lock<Latch> lk, std::size_t ClusterCursorManager::killMortalCursorsInactiveSince(OperationContext* opCtx, Date_t cutoff) { - const auto now = _clockSource->now(); - stdx::unique_lock<Latch> lk(_mutex); - - auto pred = [cutoff](CursorId cursorId, const CursorEntry& entry) -> bool { - if (entry.getLifetimeType() == CursorLifetime::Immortal || - entry.getOperationUsingCursor() || - (entry.getLsid() && !enableTimeoutOfInactiveSessionCursors.load())) { - return false; - } - - bool res = entry.getLastActive() <= cutoff; + return killCursorsSatisfying( + opCtx, [cutoff](CursorId cursorId, const CursorEntry& entry) -> bool { + if (entry.getLifetimeType() == CursorLifetime::Immortal || + entry.getOperationUsingCursor() || + (entry.getLsid() && !enableTimeoutOfInactiveSessionCursors.load())) { + return false; + } - if (res) { - LOGV2(22837, - "Cursor timed out", - "cursorId"_attr = cursorId, - "idleSince"_attr = entry.getLastActive().toString()); - } + bool res = entry.getLastActive() <= cutoff; - return res; - }; + if (res) { + LOGV2(22837, + "Cursor timed out", + "cursorId"_attr = cursorId, + "idleSince"_attr = entry.getLastActive().toString()); + } - return killCursorsSatisfying(std::move(lk), opCtx, std::move(pred), now); + return res; + }); } void ClusterCursorManager::killAllCursors(OperationContext* opCtx) { - const auto now = _clockSource->now(); - stdx::unique_lock<Latch> lk(_mutex); - auto pred = [](CursorId, const CursorEntry&) -> bool { return true; }; - - killCursorsSatisfying(std::move(lk), opCtx, std::move(pred), now); + killCursorsSatisfying(opCtx, [](CursorId, const CursorEntry&) -> bool { return true; }); } std::size_t ClusterCursorManager::killCursorsSatisfying( - stdx::unique_lock<Latch> lk, - OperationContext* opCtx, - std::function<bool(CursorId, const CursorEntry&)> pred, - Date_t now) { + OperationContext* opCtx, const std::function<bool(CursorId, const CursorEntry&)>& pred) { invariant(opCtx); - invariant(lk.owns_lock()); + const auto now = _clockSource->now(); + stdx::unique_lock<Latch> lk(_mutex); std::size_t nKilled = 0; _log.push( diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index 0b1055e1b9c..a10db367159 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -216,6 +216,151 @@ public: }; /** + * CursorEntry is a movable, non-copyable container for a single cursor. + */ + class CursorEntry { + public: + CursorEntry() = default; + + CursorEntry(std::unique_ptr<ClusterClientCursor> cursor, + CursorType cursorType, + CursorLifetime cursorLifetime, + Date_t lastActive, + UserNameIterator authenticatedUsersIter, + UUID clientUUID, + boost::optional<OperationKey> opKey) + : _cursor(std::move(cursor)), + _cursorType(cursorType), + _cursorLifetime(cursorLifetime), + _lastActive(lastActive), + _lsid(_cursor->getLsid()), + _opKey(std::move(opKey)), + _originatingClient(std::move(clientUUID)), + _authenticatedUsers( + userNameIteratorToContainer<std::vector<UserName>>(authenticatedUsersIter)) { + invariant(_cursor); + } + + CursorEntry(const CursorEntry&) = delete; + CursorEntry& operator=(const CursorEntry&) = delete; + + CursorEntry(CursorEntry&& other) = default; + CursorEntry& operator=(CursorEntry&& other) = default; + + bool isKillPending() const { + // A cursor is kill pending if it's checked out by an OperationContext that was + // interrupted. + if (!_operationUsingCursor) { + return false; + } + + // Must hold the Client lock when calling isKillPending(). + stdx::unique_lock<Client> lk(*_operationUsingCursor->getClient()); + return _operationUsingCursor->isKillPending(); + } + + CursorType getCursorType() const { + return _cursorType; + } + + CursorLifetime getLifetimeType() const { + return _cursorLifetime; + } + + Date_t getLastActive() const { + return _lastActive; + } + + boost::optional<LogicalSessionId> getLsid() const { + return _lsid; + } + + boost::optional<OperationKey> getOperationKey() const { + return _opKey; + } + + /** + * Returns a cursor guard holding the cursor owned by this CursorEntry for an operation to + * use. Only one operation may use the cursor at a time, so callers should check that + * getOperationUsingCursor() returns null before using this function. Callers may not pass + * nullptr for opCtx. Ownership of the cursor is given to the returned + * ClusterClientCursorGuard; callers that want to assume ownership over the cursor directly + * must unpack the cursor from the returned guard. + */ + ClusterClientCursorGuard releaseCursor(OperationContext* opCtx) { + invariant(!_operationUsingCursor); + invariant(_cursor); + invariant(opCtx); + _operationUsingCursor = opCtx; + return ClusterClientCursorGuard(opCtx, std::move(_cursor)); + } + + /** + * Creates a generic cursor from the cursor inside this entry. Should only be called on + * idle cursors. The caller must supply the cursorId and namespace because the CursorEntry + * does not have access to them. Cannot be called if this CursorEntry does not own an + * underlying ClusterClientCursor. + */ + GenericCursor cursorToGenericCursor(CursorId cursorId, const NamespaceString& ns) const; + + OperationContext* getOperationUsingCursor() const { + return _operationUsingCursor; + } + + /** + * Indicate that the cursor is no longer in use by an operation. Once this is called, + * another operation may check the cursor out. + */ + void returnCursor(std::unique_ptr<ClusterClientCursor> cursor) { + invariant(cursor); + invariant(!_cursor); + invariant(_operationUsingCursor); + + _cursor = std::move(cursor); + _operationUsingCursor = nullptr; + } + + void setLastActive(Date_t lastActive) { + _lastActive = lastActive; + } + + UserNameIterator getAuthenticatedUsers() const { + return makeUserNameIterator(_authenticatedUsers.begin(), _authenticatedUsers.end()); + } + + const UUID& originatingClientUuid() const { + return _originatingClient; + } + + private: + std::unique_ptr<ClusterClientCursor> _cursor; + CursorType _cursorType = CursorType::SingleTarget; + CursorLifetime _cursorLifetime = CursorLifetime::Mortal; + Date_t _lastActive; + boost::optional<LogicalSessionId> _lsid; + + /** + * The client OperationKey from the OperationContext at the time of registering a cursor. + */ + boost::optional<OperationKey> _opKey; + + /** + * Current operation using the cursor. Non-null if the cursor is checked out. + */ + OperationContext* _operationUsingCursor = nullptr; + + /** + * The UUID of the Client that opened the cursor. + */ + UUID _originatingClient; + + /** + * The set of users authorized to use this cursor. + */ + std::vector<UserName> _authenticatedUsers; + }; + + /** * Constructs an empty manager. * * Does not take ownership of 'clockSource'. 'clockSource' must refer to a non-null clock @@ -315,6 +460,12 @@ public: Status killCursor(OperationContext* opCtx, const NamespaceString& nss, CursorId cursorId); /** + * Kill the cursors satisfying the given predicate. Returns the number of cursors killed. + */ + std::size_t killCursorsSatisfying( + OperationContext* opCtx, const std::function<bool(CursorId, const CursorEntry&)>& pred); + + /** * Informs the manager that all mortal cursors with a 'last active' time equal to or earlier * than 'cutoff' should be killed. The cursors need not necessarily be in the 'idle' state. * @@ -333,7 +484,6 @@ public: */ void killAllCursors(OperationContext* opCtx); - /** * Returns the number of open cursors on a ClusterCursorManager, broken down by type. * @@ -387,7 +537,6 @@ public: } private: - class CursorEntry; struct CursorEntryContainer; using CursorEntryMap = stdx::unordered_map<CursorId, CursorEntry>; using NssToCursorContainerMap = stdx::unordered_map<NamespaceString, CursorEntryContainer>; @@ -512,145 +661,6 @@ private: void killOperationUsingCursor(WithLock, CursorEntry* entry); /** - * Kill the cursors satisfying the given predicate. Assumes that 'lk' is held upon entry. The - * 'now' parameter is only used for the internal logging mechansim. - * - * Returns the number of cursors killed. - */ - std::size_t killCursorsSatisfying(stdx::unique_lock<Latch> lk, - OperationContext* opCtx, - std::function<bool(CursorId, const CursorEntry&)> pred, - Date_t now); - - /** - * CursorEntry is a moveable, non-copyable container for a single cursor. - */ - class CursorEntry { - CursorEntry(const CursorEntry&) = delete; - CursorEntry& operator=(const CursorEntry&) = delete; - - public: - CursorEntry() = default; - - CursorEntry(std::unique_ptr<ClusterClientCursor> cursor, - CursorType cursorType, - CursorLifetime cursorLifetime, - Date_t lastActive, - UserNameIterator authenticatedUsersIter, - boost::optional<OperationKey> opKey) - : _cursor(std::move(cursor)), - _cursorType(cursorType), - _cursorLifetime(cursorLifetime), - _lastActive(lastActive), - _lsid(_cursor->getLsid()), - _opKey(std::move(opKey)), - _authenticatedUsers( - userNameIteratorToContainer<std::vector<UserName>>(authenticatedUsersIter)) { - invariant(_cursor); - } - - CursorEntry(CursorEntry&& other) = default; - CursorEntry& operator=(CursorEntry&& other) = default; - - bool isKillPending() const { - // A cursor is kill pending if it's checked out by an OperationContext that was - // interrupted. - if (!_operationUsingCursor) { - return false; - } - - // Must hold the Client lock when calling isKillPending(). - stdx::unique_lock<Client> lk(*_operationUsingCursor->getClient()); - return _operationUsingCursor->isKillPending(); - } - - CursorType getCursorType() const { - return _cursorType; - } - - CursorLifetime getLifetimeType() const { - return _cursorLifetime; - } - - Date_t getLastActive() const { - return _lastActive; - } - - boost::optional<LogicalSessionId> getLsid() const { - return _lsid; - } - - boost::optional<OperationKey> getOperationKey() const { - return _opKey; - } - - /** - * Returns a cursor guard holding the cursor owned by this CursorEntry for an operation to - * use. Only one operation may use the cursor at a time, so callers should check that - * getOperationUsingCursor() returns null before using this function. Callers may not pass - * nullptr for opCtx. Ownership of the cursor is given to the returned - * ClusterClientCursorGuard; callers that want to assume ownership over the cursor directly - * must unpack the cursor from the returned guard. - */ - ClusterClientCursorGuard releaseCursor(OperationContext* opCtx) { - invariant(!_operationUsingCursor); - invariant(_cursor); - invariant(opCtx); - _operationUsingCursor = opCtx; - return ClusterClientCursorGuard(opCtx, std::move(_cursor)); - } - - /** - * Creates a generic cursor from the cursor inside this entry. Should only be called on - * idle cursors. The caller must supply the cursorId and namespace because the CursorEntry - * does not have access to them. Cannot be called if this CursorEntry does not own an - * underlying ClusterClientCursor. - */ - GenericCursor cursorToGenericCursor(CursorId cursorId, const NamespaceString& ns) const; - - OperationContext* getOperationUsingCursor() const { - return _operationUsingCursor; - } - - /** - * Indicate that the cursor is no longer in use by an operation. Once this is called, - * another operation may check the cursor out. - */ - void returnCursor(std::unique_ptr<ClusterClientCursor> cursor) { - invariant(cursor); - invariant(!_cursor); - invariant(_operationUsingCursor); - - _cursor = std::move(cursor); - _operationUsingCursor = nullptr; - } - - void setLastActive(Date_t lastActive) { - _lastActive = lastActive; - } - - UserNameIterator getAuthenticatedUsers() const { - return makeUserNameIterator(_authenticatedUsers.begin(), _authenticatedUsers.end()); - } - - private: - std::unique_ptr<ClusterClientCursor> _cursor; - CursorType _cursorType = CursorType::SingleTarget; - CursorLifetime _cursorLifetime = CursorLifetime::Mortal; - Date_t _lastActive; - boost::optional<LogicalSessionId> _lsid; - - // The client OperationKey from the OperationContext at the time of registering a cursor. - boost::optional<OperationKey> _opKey; - - // Current operation using the cursor. Non-null if the cursor is checked out. - OperationContext* _operationUsingCursor = nullptr; - - // The set of users authorized to use this cursor. - const std::vector<UserName> _authenticatedUsers; - }; - - /** * CursorEntryContainer is a moveable, non-copyable container for a set of cursors, where all * contained cursors share the same 32-bit prefix of their cursor id. */ diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp index ef33718577c..21a00bce57b 100644 --- a/src/mongo/s/query/cluster_cursor_manager_test.cpp +++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp @@ -48,7 +48,8 @@ const NamespaceString nss("test.collection"); class ClusterCursorManagerTest : public ServiceContextTest { protected: - ClusterCursorManagerTest() : _opCtx(makeOperationContext()), _manager(&_clockSourceMock) { + ClusterCursorManagerTest() { + _opCtx = makeOperationContext(); LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); } @@ -56,8 +57,6 @@ protected: _manager.shutdown(_opCtx.get()); } - ServiceContext::UniqueOperationContext _opCtx; - static Status successAuthChecker(UserNameIterator userNames) { return Status::OK(); }; @@ -81,6 +80,13 @@ protected: } /** + * Returns an unowned pointer to the OperationContext owned by this test fixture. + */ + OperationContext* getOperationContext() { + return _opCtx.get(); + } + + /** * Allocates a mock cursor, which can be used with the 'isMockCursorKilled' method below. */ std::unique_ptr<ClusterClientCursorMock> allocateMockCursor( @@ -123,10 +129,9 @@ private: // We use std::list<> for this member (instead of std::vector<>, for example) so that we can // keep references that don't get invalidated as the list grows. std::list<bool> _cursorKilledFlags; - ClockSourceMock _clockSourceMock; - - ClusterCursorManager _manager; + ClusterCursorManager _manager{&_clockSourceMock}; + ServiceContext::UniqueOperationContext _opCtx; }; // Test that registering a cursor and checking it out returns a pin to the same cursor. @@ -134,14 +139,14 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) { auto cursor = allocateMockCursor(); cursor->queueResult(BSON("a" << 1)); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), std::move(cursor), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); auto nextResult = pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind); ASSERT_OK(nextResult.getStatus()); @@ -155,7 +160,7 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) { // Test that registering a cursor returns a non-zero cursor id. TEST_F(ClusterCursorManagerTest, RegisterCursorReturnsNonZeroId) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -169,14 +174,14 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) { auto cursor = allocateMockCursor(); cursor->queueResult(BSON("a" << 1)); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), std::move(cursor), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto checkedOutCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(checkedOutCursor.getStatus()); ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId()); auto nextResult = checkedOutCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind); @@ -197,7 +202,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { auto cursor = allocateMockCursor(); cursor->queueResult(BSON("a" << i)); cursorIds[i] = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), std::move(cursor), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -205,8 +210,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { UserNameIterator())); } for (int i = 0; i < numCursors; ++i) { - auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorIds[i], _opCtx.get(), successAuthChecker); + auto pinnedCursor = getManager()->checkOutCursor( + nss, cursorIds[i], getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); auto nextResult = pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind); ASSERT_OK(nextResult.getStatus()); @@ -221,33 +226,35 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { // Test that checking out a pinned cursor returns an error with code ErrorCodes::CursorInUse. TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_EQ( - ErrorCodes::CursorInUse, - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus()); + ASSERT_EQ(ErrorCodes::CursorInUse, + getManager() + ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker) + .getStatus()); } // Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound. TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); killCursorFromDifferentOpCtx(nss, cursorId); - ASSERT_EQ( - ErrorCodes::CursorNotFound, - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus()); + ASSERT_EQ(ErrorCodes::CursorNotFound, + getManager() + ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker) + .getStatus()); } // Test that checking out an unknown cursor returns an error with code ErrorCodes::CursorNotFound. @@ -262,7 +269,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) { const NamespaceString correctNamespace("test.correct"); const NamespaceString incorrectNamespace("test.incorrect"); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), correctNamespace, ClusterCursorManager::CursorType::SingleTarget, @@ -278,7 +285,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) { // even if there is an existing cursor with the same namespace but a different cursor id. TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -286,7 +293,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) { UserNameIterator())); ASSERT_EQ(ErrorCodes::CursorNotFound, getManager() - ->checkOutCursor(nss, cursorId + 1, _opCtx.get(), successAuthChecker) + ->checkOutCursor(nss, cursorId + 1, getOperationContext(), successAuthChecker) .getStatus()); } @@ -294,7 +301,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) { // current time. TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -303,23 +310,23 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) { Date_t cursorRegistrationTime = getClockSource()->now(); getClockSource()->advance(Milliseconds(1)); auto checkedOutCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(checkedOutCursor.getStatus()); checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); - getManager()->killMortalCursorsInactiveSince(_opCtx.get(), cursorRegistrationTime); + getManager()->killMortalCursorsInactiveSince(getOperationContext(), cursorRegistrationTime); ASSERT(!isMockCursorKilled(0)); } TEST_F(ClusterCursorManagerTest, CheckOutCursorAuthFails) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto checkedOutCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), failAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), failAuthChecker); ASSERT_EQ(checkedOutCursor.getStatus(), ErrorCodes::Unauthorized); } @@ -328,7 +335,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorAuthFails) { // current time. TEST_F(ClusterCursorManagerTest, ReturnCursorUpdateActiveTime) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -336,18 +343,18 @@ TEST_F(ClusterCursorManagerTest, ReturnCursorUpdateActiveTime) { UserNameIterator())); Date_t cursorCheckOutTime = getClockSource()->now(); auto checkedOutCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(checkedOutCursor.getStatus()); getClockSource()->advance(Milliseconds(1)); checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); - getManager()->killMortalCursorsInactiveSince(_opCtx.get(), cursorCheckOutTime); + getManager()->killMortalCursorsInactiveSince(getOperationContext(), cursorCheckOutTime); ASSERT(!isMockCursorKilled(0)); } // Test that killing a pinned cursor by id successfully kills the cursor. TEST_F(ClusterCursorManagerTest, KillUnpinnedCursorBasic) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -360,19 +367,19 @@ TEST_F(ClusterCursorManagerTest, KillUnpinnedCursorBasic) { // Test that killing a pinned cursor by id successfully kills the cursor. TEST_F(ClusterCursorManagerTest, KillPinnedCursorBasic) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); killCursorFromDifferentOpCtx(nss, pinnedCursor.getValue().getCursorId()); // When the cursor is pinned the operation which checked out the cursor should be interrupted. - ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); + ASSERT_EQ(getOperationContext()->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); ASSERT(!isMockCursorKilled(0)); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); @@ -387,7 +394,7 @@ TEST_F(ClusterCursorManagerTest, KillCursorMultipleCursors) { // Register cursors and populate 'cursorIds' with the returned cursor ids. for (size_t i = 0; i < numCursors; ++i) { cursorIds[i] = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -396,14 +403,14 @@ TEST_F(ClusterCursorManagerTest, KillCursorMultipleCursors) { } // Kill each cursor and verify that it was successfully killed. for (size_t i = 0; i < numCursors; ++i) { - ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorIds[i])); + ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorIds[i])); ASSERT(isMockCursorKilled(i)); } } // Test that killing an unknown cursor returns an error with code ErrorCodes::CursorNotFound. TEST_F(ClusterCursorManagerTest, KillCursorUnknown) { - Status killResult = getManager()->killCursor(_opCtx.get(), nss, 5); + Status killResult = getManager()->killCursor(getOperationContext(), nss, 5); ASSERT_EQ(ErrorCodes::CursorNotFound, killResult); } @@ -413,13 +420,14 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) { const NamespaceString correctNamespace("test.correct"); const NamespaceString incorrectNamespace("test.incorrect"); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), correctNamespace, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); - Status killResult = getManager()->killCursor(_opCtx.get(), incorrectNamespace, cursorId); + Status killResult = + getManager()->killCursor(getOperationContext(), incorrectNamespace, cursorId); ASSERT_EQ(ErrorCodes::CursorNotFound, killResult); } @@ -427,25 +435,25 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) { // even if there is an existing cursor with the same namespace but a different cursor id. TEST_F(ClusterCursorManagerTest, KillCursorWrongCursorId) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); - Status killResult = getManager()->killCursor(_opCtx.get(), nss, cursorId + 1); + Status killResult = getManager()->killCursor(getOperationContext(), nss, cursorId + 1); ASSERT_EQ(ErrorCodes::CursorNotFound, killResult); } // Test that killing all mortal expired cursors correctly kills a mortal expired cursor. TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) { - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); - getManager()->killMortalCursorsInactiveSince(_opCtx.get(), getClockSource()->now()); + getManager()->killMortalCursorsInactiveSince(getOperationContext(), getClockSource()->now()); ASSERT(isMockCursorKilled(0)); } @@ -453,25 +461,25 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) { TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipUnexpired) { Date_t timeBeforeCursorCreation = getClockSource()->now(); getClockSource()->advance(Milliseconds(1)); - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); - getManager()->killMortalCursorsInactiveSince(_opCtx.get(), timeBeforeCursorCreation); + getManager()->killMortalCursorsInactiveSince(getOperationContext(), timeBeforeCursorCreation); ASSERT(!isMockCursorKilled(0)); } // Test that killing all mortal expired cursors does not kill a cursor that is immortal. TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipImmortal) { - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Immortal, UserNameIterator())); - getManager()->killMortalCursorsInactiveSince(_opCtx.get(), getClockSource()->now()); + getManager()->killMortalCursorsInactiveSince(getOperationContext(), getClockSource()->now()); ASSERT(!isMockCursorKilled(0)); } @@ -479,18 +487,18 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipImmortal) { // pinned. TEST_F(ClusterCursorManagerTest, ShouldNotKillPinnedCursors) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); - auto pin = - assertGet(getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker)); - getManager()->killMortalCursorsInactiveSince(_opCtx.get(), getClockSource()->now()); + auto pin = assertGet( + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker)); + getManager()->killMortalCursorsInactiveSince(getOperationContext(), getClockSource()->now()); ASSERT(!isMockCursorKilled(0)); pin.returnCursor(ClusterCursorManager::CursorState::NotExhausted); - getManager()->killMortalCursorsInactiveSince(_opCtx.get(), getClockSource()->now()); + getManager()->killMortalCursorsInactiveSince(getOperationContext(), getClockSource()->now()); ASSERT(isMockCursorKilled(0)); } @@ -504,7 +512,7 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors) if (i < numKilledCursorsExpected) { cutoff = getClockSource()->now(); } - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -512,7 +520,7 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors) UserNameIterator())); getClockSource()->advance(Milliseconds(1)); } - getManager()->killMortalCursorsInactiveSince(_opCtx.get(), cutoff); + getManager()->killMortalCursorsInactiveSince(getOperationContext(), cutoff); for (size_t i = 0; i < numCursors; ++i) { if (i < numKilledCursorsExpected) { ASSERT(isMockCursorKilled(i)); @@ -526,19 +534,121 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors) TEST_F(ClusterCursorManagerTest, KillAllCursors) { const size_t numCursors = 10; for (size_t i = 0; i < numCursors; ++i) { - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal, + UserNameIterator())); + } + getManager()->killAllCursors(getOperationContext()); + for (size_t i = 0; i < numCursors; ++i) { + ASSERT(isMockCursorKilled(i)); + } +} + +TEST_F(ClusterCursorManagerTest, KillCursorsSatisfyingAlwaysTrueKillsAllCursors) { + const size_t numCursors = 10; + for (size_t i = 0; i < numCursors; ++i) { + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); } - getManager()->killAllCursors(_opCtx.get()); + auto pred = [](CursorId, const ClusterCursorManager::CursorEntry&) { return true; }; + auto nKilled = getManager()->killCursorsSatisfying(getOperationContext(), std::move(pred)); + ASSERT_EQ(nKilled, numCursors); for (size_t i = 0; i < numCursors; ++i) { ASSERT(isMockCursorKilled(i)); } } +TEST_F(ClusterCursorManagerTest, KillCursorsSatisfyingAlwaysFalseKillsNoCursors) { + const size_t numCursors = 10; + for (size_t i = 0; i < numCursors; ++i) { + ASSERT_OK(getManager()->registerCursor(getOperationContext(), + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal, + UserNameIterator())); + } + auto pred = [](CursorId, const ClusterCursorManager::CursorEntry&) { return false; }; + auto nKilled = getManager()->killCursorsSatisfying(getOperationContext(), std::move(pred)); + ASSERT_EQ(nKilled, 0); + for (size_t i = 0; i < numCursors; ++i) { + ASSERT(!isMockCursorKilled(i)); + } +} + +TEST_F(ClusterCursorManagerTest, KillCursorsSatisfyingOnlyKillsMatchingSubset) { + const size_t numCursors = 10; + stdx::unordered_set<CursorId> idsToKill; + auto shouldKillCursor = [](size_t idx) { return idx % 2 == 0; }; + for (size_t i = 0; i < numCursors; ++i) { + auto swCursorId = + getManager()->registerCursor(getOperationContext(), + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal, + UserNameIterator()); + ASSERT_OK(swCursorId); + if (shouldKillCursor(i)) + idsToKill.insert(swCursorId.getValue()); + } + auto pred = [&](CursorId id, const ClusterCursorManager::CursorEntry&) { + return idsToKill.contains(id); + }; + auto nKilled = getManager()->killCursorsSatisfying(getOperationContext(), std::move(pred)); + ASSERT_EQ(nKilled, numCursors / 2); + for (size_t i = 0; i < numCursors; ++i) { + if (shouldKillCursor(i)) + ASSERT(isMockCursorKilled(i)); + else + ASSERT(!isMockCursorKilled(i)); + } +} + +// Test that the Client that registered a cursor is correctly recorded. +TEST_F(ClusterCursorManagerTest, CorrectlyRecordsOriginatingClient) { + ASSERT_OK(getManager()->registerCursor(getOperationContext(), + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::MultiTarget, + ClusterCursorManager::CursorLifetime::Mortal, + UserNameIterator())); + // Now insert some cursors under a different client. + const size_t numAltClientCursors = 10; + { + auto otherClient = getServiceContext()->makeClient("otherClient"); + auto otherOpCtx = otherClient->makeOperationContext(); + AlternativeClientRegion acr(otherClient); + for (size_t i = 0; i < numAltClientCursors; ++i) { + ASSERT_OK(getManager()->registerCursor(otherOpCtx.get(), + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::MultiTarget, + ClusterCursorManager::CursorLifetime::Mortal, + UserNameIterator())); + } + } + + auto pred = [client = getClient()](CursorId id, + const ClusterCursorManager::CursorEntry& entry) { + return entry.originatingClientUuid() == client->getUUID(); + }; + // Ensure we only matched the initial client, not the alternate one. + auto nKilled = getManager()->killCursorsSatisfying(getOperationContext(), std::move(pred)); + ASSERT_EQ(nKilled, 1); + ASSERT(isMockCursorKilled(0)); + for (size_t i = 1; i < numAltClientCursors + 1; ++i) { + ASSERT(!isMockCursorKilled(i)); + } +} + // Test that a new ClusterCursorManager's stats() is initially zero for the cursor counts. TEST_F(ClusterCursorManagerTest, StatsInitAsZero) { ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget); @@ -548,7 +658,7 @@ TEST_F(ClusterCursorManagerTest, StatsInitAsZero) { // Test that registering a sharded cursor updates the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) { - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::MultiTarget, @@ -559,7 +669,7 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) { // Test that registering a not-sharded cursor updates the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) { - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -571,14 +681,14 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) { // Test that checking out a cursor updates the pinned counter in stats(). TEST_F(ClusterCursorManagerTest, StatsPinCursor) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); } @@ -587,7 +697,7 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) { TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) { const size_t numShardedCursors = 10; for (size_t i = 0; i < numShardedCursors; ++i) { - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::MultiTarget, @@ -598,7 +708,7 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) { } const size_t numNotShardedCursors = 10; for (size_t i = 0; i < numNotShardedCursors; ++i) { - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -612,61 +722,61 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) { // Test that killing a sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsKillShardedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); ASSERT_EQ(1U, getManager()->stats().cursorsMultiTarget); - ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId)); + ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId)); ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget); } // Test that killing a not-sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsKillNotShardedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); ASSERT_EQ(1U, getManager()->stats().cursorsSingleTarget); - ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId)); + ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId)); ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget); } // Test that killing a pinned cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); killCursorFromDifferentOpCtx(nss, cursorId); - ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); + ASSERT_EQ(getOperationContext()->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); } // Test that exhausting a sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK( pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); @@ -678,14 +788,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) { // Test that exhausting a not-sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK( pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); @@ -698,14 +808,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) { // stats(). TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK( pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); @@ -718,14 +828,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) { // stats(). TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK( pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); @@ -737,7 +847,7 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) { // Test that getting the namespace for a cursor returns the correct namespace. TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdBasic) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -756,7 +866,7 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsSameNames std::vector<CursorId> cursorIds(numCursors); for (size_t i = 0; i < numCursors; ++i) { cursorIds[i] = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -779,7 +889,7 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsDifferent for (size_t i = 0; i < numCursors; ++i) { NamespaceString cursorNamespace(std::string(str::stream() << "test.collection" << i)); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), cursorNamespace, ClusterCursorManager::CursorType::SingleTarget, @@ -811,21 +921,21 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDefaultConstructor) { // cursor. TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto registeredCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); auto checkedOutCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(checkedOutCursor.getStatus()); } @@ -833,14 +943,14 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) { // cursor, and leaves the pin owning no cursor. TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto registeredCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); @@ -850,8 +960,9 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); // Cursor should have been killed and destroyed. - ASSERT_NOT_OK( - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus()); + ASSERT_NOT_OK(getManager() + ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker) + .getStatus()); ASSERT(isMockCursorKilled(0)); } @@ -861,14 +972,14 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust auto mockCursor = allocateMockCursor(); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), std::move(mockCursor), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto registeredCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); @@ -879,22 +990,23 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust // Cursor should be killed as soon as it's checked in. ASSERT(isMockCursorKilled(0)); - ASSERT_NOT_OK( - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus()); + ASSERT_NOT_OK(getManager() + ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker) + .getStatus()); } // Test that the PinnedCursor move assignment operator correctly kills the cursor if it has not yet // been returned. TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); pinnedCursor = ClusterCursorManager::PinnedCursor(); ASSERT(isMockCursorKilled(0)); } @@ -903,14 +1015,14 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) { TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) { { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); } ASSERT(isMockCursorKilled(0)); } @@ -921,14 +1033,14 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) { ASSERT_FALSE(mockCursor->remotesExhausted()); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), std::move(mockCursor), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_FALSE(pinnedCursor.getValue()->remotesExhausted()); } @@ -937,23 +1049,23 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) { TEST_F(ClusterCursorManagerTest, DoNotDestroyKilledPinnedCursors) { const Date_t cutoff = getClockSource()->now(); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); killCursorFromDifferentOpCtx(nss, cursorId); - ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); + ASSERT_EQ(getOperationContext()->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); ASSERT(!isMockCursorKilled(0)); // The cursor cleanup system should not destroy the cursor either. - getManager()->killMortalCursorsInactiveSince(_opCtx.get(), cutoff); + getManager()->killMortalCursorsInactiveSince(getOperationContext(), cutoff); // The cursor's operation context should be marked as interrupted, but the cursor itself should // not have been destroyed. @@ -975,14 +1087,14 @@ TEST_F(ClusterCursorManagerTest, CursorStoresAPIParameters) { cursor->setAPIParameters(apiParams); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), std::move(cursor), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); - auto pinnedCursor = - assertGet(getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker)); + auto pinnedCursor = assertGet( + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker)); auto storedAPIParams = pinnedCursor->getAPIParameters(); ASSERT_EQ("2", *storedAPIParams.getAPIVersion()); @@ -991,7 +1103,7 @@ TEST_F(ClusterCursorManagerTest, CursorStoresAPIParameters) { } TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) { - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -999,12 +1111,12 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) { UserNameIterator())); ASSERT(!isMockCursorKilled(0)); - getManager()->shutdown(_opCtx.get()); + getManager()->shutdown(getOperationContext()); ASSERT(isMockCursorKilled(0)); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, - getManager()->registerCursor(_opCtx.get(), + getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1014,7 +1126,7 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) { TEST_F(ClusterCursorManagerTest, PinnedCursorNotKilledOnShutdown) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1022,10 +1134,10 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorNotKilledOnShutdown) { UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); - getManager()->shutdown(_opCtx.get()); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); + getManager()->shutdown(getOperationContext()); - ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); + ASSERT_EQ(getOperationContext()->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); ASSERT(!isMockCursorKilled(0)); // Even if it's checked back in as not exhausted, it should have been marked as killed when @@ -1036,7 +1148,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorNotKilledOnShutdown) { TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1044,13 +1156,14 @@ TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) { UserNameIterator())); ASSERT(!isMockCursorKilled(0)); - getManager()->shutdown(_opCtx.get()); + getManager()->shutdown(getOperationContext()); ASSERT(isMockCursorKilled(0)); - ASSERT_EQUALS( - ErrorCodes::ShutdownInProgress, - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, + getManager() + ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker) + .getStatus()); } /** @@ -1058,7 +1171,7 @@ TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) { */ TEST_F(ClusterCursorManagerTest, CursorsWithoutSessions) { // Add a cursor with no session to the cursor manager. - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1078,7 +1191,7 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithASession) { // Add a cursor with a session to the cursor manager. auto lsid = makeLogicalSessionIdForTest(); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(lsid), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1097,7 +1210,7 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithASession) { ASSERT(cursors.find(cursorId) != cursors.end()); // Remove the cursor from the manager. - ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId)); + ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId)); // There should be no more cursor entries by session id. LogicalSessionIdSet sessions; @@ -1113,7 +1226,7 @@ TEST_F(ClusterCursorManagerTest, GetSessionIdsWhileCheckedOut) { // Add a cursor with a session to the cursor manager. auto lsid = makeLogicalSessionIdForTest(); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(lsid), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1121,7 +1234,8 @@ TEST_F(ClusterCursorManagerTest, GetSessionIdsWhileCheckedOut) { UserNameIterator())); // Check the cursor out, then try to append cursors, see that we get one. - auto res = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + auto res = + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT(res.isOK()); auto cursors = getManager()->getCursorsForSession(lsid); @@ -1135,14 +1249,14 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameSession) { // Add two cursors on the same session to the cursor manager. auto lsid = makeLogicalSessionIdForTest(); auto cursorId1 = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(lsid), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto cursorId2 = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(lsid), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1162,7 +1276,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameSession) { ASSERT(cursors.find(cursorId2) != cursors.end()); // Remove one cursor from the manager. - ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId1)); + ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId1)); // Should still be able to retrieve the session. lsids.clear(); @@ -1185,7 +1299,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleSessions) { // Register two cursors with different lsids, and one without. CursorId cursor1 = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(lsid1), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1193,14 +1307,14 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleSessions) { UserNameIterator())); CursorId cursor2 = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(lsid2), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1231,7 +1345,7 @@ TEST_F(ClusterCursorManagerTest, ManyCursorsManySessions) { const int count = 10000; for (int i = 0; i < count; i++) { auto lsid = makeLogicalSessionIdForTest(); - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(lsid), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1247,26 +1361,27 @@ TEST_F(ClusterCursorManagerTest, ManyCursorsManySessions) { TEST_F(ClusterCursorManagerTest, CheckAuthForKillCursors) { auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); - ASSERT_EQ( - ErrorCodes::CursorNotFound, - getManager()->checkAuthForKillCursors(_opCtx.get(), nss, cursorId + 1, successAuthChecker)); + ASSERT_EQ(ErrorCodes::CursorNotFound, + getManager()->checkAuthForKillCursors( + getOperationContext(), nss, cursorId + 1, successAuthChecker)); ASSERT_EQ(ErrorCodes::Unauthorized, - getManager()->checkAuthForKillCursors(_opCtx.get(), nss, cursorId, failAuthChecker)); - ASSERT_OK( - getManager()->checkAuthForKillCursors(_opCtx.get(), nss, cursorId, successAuthChecker)); + getManager()->checkAuthForKillCursors( + getOperationContext(), nss, cursorId, failAuthChecker)); + ASSERT_OK(getManager()->checkAuthForKillCursors( + getOperationContext(), nss, cursorId, successAuthChecker)); } TEST_F(ClusterCursorManagerTest, PinnedCursorReturnsUnderlyingCursorTxnNumber) { const TxnNumber txnNumber = 5; auto cursorId = assertGet( - getManager()->registerCursor(_opCtx.get(), + getManager()->registerCursor(getOperationContext(), allocateMockCursor(makeLogicalSessionIdForTest(), txnNumber), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1274,7 +1389,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnsUnderlyingCursorTxnNumber) { UserNameIterator())); auto pinnedCursor = - getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); // The underlying cursor's txnNumber should be returned. @@ -1283,7 +1398,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnsUnderlyingCursorTxnNumber) { } TEST_F(ClusterCursorManagerTest, CursorsWithoutOperationKeys) { - ASSERT_OK(getManager()->registerCursor(_opCtx.get(), + ASSERT_OK(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1295,9 +1410,9 @@ TEST_F(ClusterCursorManagerTest, CursorsWithoutOperationKeys) { TEST_F(ClusterCursorManagerTest, OneCursorWithAnOperationKey) { auto opKey = UUID::gen(); - _opCtx->setOperationKey(opKey); + getOperationContext()->setOperationKey(opKey); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1310,7 +1425,7 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithAnOperationKey) { ASSERT(cursors.find(cursorId) != cursors.end()); // Remove the cursor from the manager. - ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId)); + ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId)); // There should be no more cursor entries for this operation key. ASSERT(getManager()->getCursorsForOpKeys({opKey}).empty()); @@ -1318,9 +1433,9 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithAnOperationKey) { TEST_F(ClusterCursorManagerTest, GetCursorByOpKeyWhileCheckedOut) { auto opKey = UUID::gen(); - _opCtx->setOperationKey(opKey); + getOperationContext()->setOperationKey(opKey); auto cursorId = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1328,7 +1443,8 @@ TEST_F(ClusterCursorManagerTest, GetCursorByOpKeyWhileCheckedOut) { UserNameIterator())); // Check the cursor out then look it up by operation key. - auto res = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); + auto res = + getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker); ASSERT(res.isOK()); auto cursors = getManager()->getCursorsForOpKeys({opKey}); @@ -1337,16 +1453,16 @@ TEST_F(ClusterCursorManagerTest, GetCursorByOpKeyWhileCheckedOut) { TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameOperationKey) { auto opKey = UUID::gen(); - _opCtx->setOperationKey(opKey); + getOperationContext()->setOperationKey(opKey); auto cursorId1 = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal, UserNameIterator())); auto cursorId2 = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, @@ -1360,7 +1476,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameOperationKey) { ASSERT(cursors.find(cursorId2) != cursors.end()); // Remove one cursor from the manager. - ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId1)); + ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId1)); // Should still be able to retrieve remaining cursor by session. cursors = getManager()->getCursorsForOpKeys({opKey}); @@ -1371,7 +1487,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameOperationKey) { TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleOperationKeys) { auto opKey1 = UUID::gen(); auto opKey2 = UUID::gen(); - _opCtx->setOperationKey(opKey1); + getOperationContext()->setOperationKey(opKey1); auto client2 = getServiceContext()->makeClient("client2"); auto opCtx2 = client2->makeOperationContext(); @@ -1379,7 +1495,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleOperationKeys) { // Register two cursors with different operation keys. CursorId cursor1 = - assertGet(getManager()->registerCursor(_opCtx.get(), + assertGet(getManager()->registerCursor(getOperationContext(), allocateMockCursor(), nss, ClusterCursorManager::CursorType::SingleTarget, diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp index c9ec249b0f7..7e07d31c688 100644 --- a/src/mongo/s/service_entry_point_mongos.cpp +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -49,6 +49,9 @@ #include "mongo/rpc/warn_deprecated_wire_ops.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/strategy.h" +#include "mongo/s/grid.h" +#include "mongo/s/load_balancer_support.h" +#include "mongo/s/query/cluster_cursor_manager.h" namespace mongo { @@ -288,4 +291,14 @@ Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCt return hr->run(); } +void ServiceEntryPointMongos::onClientDisconnect(Client* client) { + if (load_balancer_support::isFromLoadBalancer(client)) { + auto ccm = Grid::get(client->getServiceContext())->getCursorManager(); + auto killerOperationContext = client->makeOperationContext(); + ccm->killCursorsSatisfying(killerOperationContext.get(), + [&](CursorId, const ClusterCursorManager::CursorEntry& entry) { + return entry.originatingClientUuid() == client->getUUID(); + }); + } +} } // namespace mongo diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h index 17d99fa3a01..444f89eef4c 100644 --- a/src/mongo/s/service_entry_point_mongos.h +++ b/src/mongo/s/service_entry_point_mongos.h @@ -46,6 +46,8 @@ public: using ServiceEntryPointImpl::ServiceEntryPointImpl; Future<DbResponse> handleRequest(OperationContext* opCtx, const Message& request) noexcept override; + + void onClientDisconnect(Client* client) override; }; } // namespace mongo diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index 3e8d279b7b9..44786921474 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -32,6 +32,7 @@ #include <limits> #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" #include "mongo/util/future.h" @@ -102,6 +103,13 @@ public: */ virtual void onEndSession(const transport::SessionHandle&) {} + /** + * Optional handler which is invoked after a client disconnect. A client disconnect occurs when + * the connection between the mongo process and client is closed for any reason, and is defined + * by the destruction and cleanup of the ServiceStateMachine that manages the client. + */ + virtual void onClientDisconnect(Client* client) {} + protected: ServiceEntryPoint() = default; }; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 94616fed5c3..492ede06f9f 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -644,9 +644,10 @@ void ServiceStateMachine::Impl::cleanupSession(const Status& status) { LOGV2_DEBUG(5127900, 2, "Ending session", "error"_attr = status); cleanupExhaustResources(); + auto client = _clientStrand->getClientPointer(); + _sep->onClientDisconnect(client); { - auto client = _clientStrand->getClientPointer(); stdx::lock_guard lk(*client); transport::ServiceExecutorContext::reset(client); } diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index cae5c81e41d..88485924b76 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -258,6 +258,10 @@ public: return _data->isConnected.load(); } + int onClientDisconnectCalledTimes() const { + return _onClientDisconnectCalled; + } + friend constexpr StringData toString(FailureCondition fail) { switch (fail) { case FailureCondition::kNone: @@ -452,6 +456,10 @@ private: _stateQueue.push(IngressState::kEnd); } + void _onClientDisconnect() { + ++_onClientDisconnectCalled; + } + const boost::optional<ServiceExecutor::ThreadingModel> _threadingModel; boost::optional<ServiceExecutor::ThreadingModel> _originalThreadingModel; @@ -463,6 +471,8 @@ private: std::shared_ptr<ServiceStateMachineTest::Session> _session; SingleProducerSingleConsumerQueue<IngressState> _stateQueue; + + int _onClientDisconnectCalled{0}; }; /** @@ -552,6 +562,11 @@ public: _fixture->_cleanup(session); } + void onClientDisconnect(Client* client) override { + invariant(client); + _fixture->_onClientDisconnect(); + } + private: ServiceStateMachineTest* const _fixture; }; @@ -794,6 +809,17 @@ TEST_F(ServiceStateMachineTest, EndBeforeStartSession) { startSession(); } +TEST_F(ServiceStateMachineTest, OnClientDisconnectCalledOnCleanup) { + initNewSession(); + startSession(); + ASSERT_EQ(popIngressState(), IngressState::kSource); + ASSERT_EQ(onClientDisconnectCalledTimes(), 0); + endSession(); + ASSERT_EQ(popIngressState(), IngressState::kEnd); + joinSession(); + ASSERT_EQ(onClientDisconnectCalledTimes(), 1); +} + TEST_F(ServiceStateMachineWithDedicatedThreadsTest, DefaultLoop) { auto runner = StepRunner(this); |