summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2021-10-13 19:38:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-16 17:51:42 +0000
commitca1a96490687e5bff52c35bb5eec8408e0912291 (patch)
tree49d69e9c356e4a9b65920afb3b8568e02f0c72fa
parent0a38f458292dc64d5be84663bec8102d9a232f1e (diff)
downloadmongo-ca1a96490687e5bff52c35bb5eec8408e0912291.tar.gz
SERVER-58503 Kill open cursors for a connection when a load balanced connection on mongos is closed
(cherry picked from commit b429d5dda98bbe18ab0851ffd1729d3b57fc8a4e)
-rw-r--r--jstests/sharding/load_balancer_support/disconnect_kills_cursors.js97
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/load_balancer_support.cpp20
-rw-r--r--src/mongo/s/load_balancer_support.h7
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp51
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h292
-rw-r--r--src/mongo/s/query/cluster_cursor_manager_test.cpp424
-rw-r--r--src/mongo/s/service_entry_point_mongos.cpp13
-rw-r--r--src/mongo/s/service_entry_point_mongos.h2
-rw-r--r--src/mongo/transport/service_entry_point.h8
-rw-r--r--src/mongo/transport/service_state_machine.cpp3
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp26
12 files changed, 611 insertions, 333 deletions
diff --git a/jstests/sharding/load_balancer_support/disconnect_kills_cursors.js b/jstests/sharding/load_balancer_support/disconnect_kills_cursors.js
new file mode 100644
index 00000000000..133f6bad748
--- /dev/null
+++ b/jstests/sharding/load_balancer_support/disconnect_kills_cursors.js
@@ -0,0 +1,97 @@
+/**
+ * @tags: [featureFlagLoadBalancer]
+ *
+ * Tests that when a load-balanced client disconnects, its cursors are killed.
+ */
+
+(() => {
+ "use strict";
+ load("jstests/libs/parallelTester.js");
+
+ function setupShardedCollection(st, dbName, collName) {
+ const fullNss = dbName + "." + collName;
+ const admin = st.s.getDB("admin");
+ // Shard collection; ensure docs on each shard
+ assert.commandWorked(admin.runCommand({enableSharding: dbName}));
+ assert.commandWorked(admin.runCommand({movePrimary: dbName, to: st.shard0.shardName}));
+ assert.commandWorked(admin.runCommand({shardCollection: fullNss, key: {_id: 1}}));
+ assert.commandWorked(admin.runCommand({split: fullNss, middle: {_id: 0}}));
+ assert.commandWorked(
+ admin.runCommand({moveChunk: fullNss, find: {_id: 0}, to: st.shard1.shardName}));
+
+ // Insert some docs on each shard
+ let coll = admin.getSiblingDB(dbName).getCollection(collName);
+ var bulk = coll.initializeUnorderedBulkOp();
+ for (let i = -150; i < 150; i++) {
+ bulk.insert({_id: i});
+ }
+ assert.commandWorked(bulk.execute());
+ }
+
+ function openCursor(mongosHost, dbName, collName, countdownLatch, identifyingComment) {
+ const newDBConn = new Mongo(mongosHost).getDB(dbName);
+ assert.commandWorked(newDBConn.getSiblingDB("admin").adminCommand(
+ {configureFailPoint: "clientIsFromLoadBalancer", mode: "alwaysOn"}));
+ let cmdRes =
+ newDBConn.runCommand({find: collName, comment: identifyingComment, batchSize: 1});
+ assert.commandWorked(cmdRes);
+ const cursorId = cmdRes.cursor.id;
+ assert.neq(cursorId, NumberLong(0));
+ // Wait until countdownLatch value is 0.
+ countdownLatch.await();
+ return cursorId;
+ }
+
+ const testName = "load_balanced_disconnect_kills_cursors";
+ let st = new ShardingTest({shards: 2, mongos: 1});
+ const dbName = "foo";
+ const collName = "bar";
+ const admin = st.s.getDB("admin");
+ const identifyingComment = "loadBalancedDisconnectComment";
+
+ setupShardedCollection(st, dbName, collName);
+ let countdownLatch = new CountDownLatch(1);
+
+ let cursorOpeningThread =
+ new Thread(openCursor, st.s.host, dbName, collName, countdownLatch, identifyingComment);
+ cursorOpeningThread.start();
+
+ let idleCursor = {};
+
+ // Wait until we can see the cursor opened by cursorOpeningThread, identified by the comment, as
+ // idle.
+ assert.soon(() => {
+ const curopCursor = admin.aggregate([
+ {$currentOp: {allUsers: true, idleCursors: true, localOps: true}},
+ {$match: {type: "idleCursor"}},
+ {$match: {"cursor.originatingCommand.comment": identifyingComment}}
+ ]);
+ if (curopCursor.hasNext()) {
+ idleCursor = curopCursor.next().cursor;
+ return true;
+ }
+ return false;
+ }, "Couldn't find cursor opened by cursorOpeningThread");
+
+ // We've found the cursor opened by cursorOpeningThread.
+ // We now join that thread, and therefore end its connection to the server.
+ countdownLatch.countDown();
+ cursorOpeningThread.join();
+ assert.commandWorked(
+ admin.adminCommand({configureFailPoint: "clientIsFromLoadBalancer", mode: "off"}));
+
+ let cursorId = cursorOpeningThread.returnData();
+ assert.eq(idleCursor.cursorId, cursorId);
+
+ // Make sure we can't find that cursor anymore/it has been killed.
+ const numCursorsFoundWithId =
+ admin
+ .aggregate([
+ {$currentOp: {allUsers: true, idleCursors: true, localOps: true}},
+ {$match: {type: "idleCursor"}},
+ {$match: {"cursor.cursorId": cursorId}}
+ ])
+ .itcount();
+ assert.eq(numCursorsFoundWithId, NumberLong(0));
+ st.stop();
+})();
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 98e81ffb1be..4eeb40e6e38 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -554,6 +554,7 @@ env.Library(
'cluster_last_error_info',
'commands/cluster_commands',
'committed_optime_metadata_hook',
+ 'load_balancer_support',
'mongos_initializers',
'mongos_topology_coordinator',
'query/cluster_cursor_cleanup_job',
diff --git a/src/mongo/s/load_balancer_support.cpp b/src/mongo/s/load_balancer_support.cpp
index b1ebf6bb6d3..1f2e1dcaf9d 100644
--- a/src/mongo/s/load_balancer_support.cpp
+++ b/src/mongo/s/load_balancer_support.cpp
@@ -45,11 +45,6 @@ namespace {
MONGO_FAIL_POINT_DEFINE(clientIsFromLoadBalancer);
-bool featureEnabled() {
- return feature_flags::gFeatureFlagLoadBalancer.isEnabled(
- serverGlobalParams.featureCompatibility);
-}
-
struct PerService {
/**
* When a client reaches a mongos through a load balancer, the `serviceId`
@@ -92,15 +87,20 @@ const auto getPerServiceState = ServiceContext::declareDecoration<PerService>();
const auto getPerClientState = Client::declareDecoration<PerClient>();
} // namespace
+bool isEnabled() {
+ return feature_flags::gFeatureFlagLoadBalancer.isEnabled(
+ serverGlobalParams.featureCompatibility);
+}
+
void setClientIsFromLoadBalancer(Client* client) {
- if (!featureEnabled())
+ if (!isEnabled())
return;
auto& perClient = getPerClientState(client);
perClient.setIsFromLoadBalancer();
}
void handleHello(OperationContext* opCtx, BSONObjBuilder* result, bool helloHasLoadBalancedOption) {
- if (!featureEnabled())
+ if (!isEnabled())
return;
auto& perClient = getPerClientState(opCtx->getClient());
if (perClient.didHello() || !perClient.isFromLoadBalancer())
@@ -115,4 +115,10 @@ void handleHello(OperationContext* opCtx, BSONObjBuilder* result, bool helloHasL
perClient.setDidHello();
}
+bool isFromLoadBalancer(Client* client) {
+ if (!isEnabled()) {
+ return false;
+ }
+ return getPerClientState(client).isFromLoadBalancer();
+}
} // namespace mongo::load_balancer_support
diff --git a/src/mongo/s/load_balancer_support.h b/src/mongo/s/load_balancer_support.h
index c3d46d65823..1000a2f32f7 100644
--- a/src/mongo/s/load_balancer_support.h
+++ b/src/mongo/s/load_balancer_support.h
@@ -54,4 +54,11 @@ void setClientIsFromLoadBalancer(Client* client);
*/
void handleHello(OperationContext* opCtx, BSONObjBuilder* result, bool helloHasLoadBalancedOption);
+bool isFromLoadBalancer(Client* client);
+
+/**
+ * Returns whether the feature flag for load balancer support is enabled.
+ */
+bool isEnabled();
+
} // namespace mongo::load_balancer_support
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index be5f905ff35..668795d20fd 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -234,6 +234,7 @@ StatusWith<CursorId> ClusterCursorManager::registerCursor(
cursorLifetime,
now,
authenticatedUsers,
+ opCtx->getClient()->getUUID(),
opCtx->getOperationKey()));
invariant(emplaceResult.second);
_log.push({LogEvent::Type::kRegisterComplete, cursorId, now, nss});
@@ -408,46 +409,36 @@ void ClusterCursorManager::detachAndKillCursor(stdx::unique_lock<Latch> lk,
std::size_t ClusterCursorManager::killMortalCursorsInactiveSince(OperationContext* opCtx,
Date_t cutoff) {
- const auto now = _clockSource->now();
- stdx::unique_lock<Latch> lk(_mutex);
-
- auto pred = [cutoff](CursorId cursorId, const CursorEntry& entry) -> bool {
- if (entry.getLifetimeType() == CursorLifetime::Immortal ||
- entry.getOperationUsingCursor() ||
- (entry.getLsid() && !enableTimeoutOfInactiveSessionCursors.load())) {
- return false;
- }
-
- bool res = entry.getLastActive() <= cutoff;
+ return killCursorsSatisfying(
+ opCtx, [cutoff](CursorId cursorId, const CursorEntry& entry) -> bool {
+ if (entry.getLifetimeType() == CursorLifetime::Immortal ||
+ entry.getOperationUsingCursor() ||
+ (entry.getLsid() && !enableTimeoutOfInactiveSessionCursors.load())) {
+ return false;
+ }
- if (res) {
- LOGV2(22837,
- "Cursor timed out",
- "cursorId"_attr = cursorId,
- "idleSince"_attr = entry.getLastActive().toString());
- }
+ bool res = entry.getLastActive() <= cutoff;
- return res;
- };
+ if (res) {
+ LOGV2(22837,
+ "Cursor timed out",
+ "cursorId"_attr = cursorId,
+ "idleSince"_attr = entry.getLastActive().toString());
+ }
- return killCursorsSatisfying(std::move(lk), opCtx, std::move(pred), now);
+ return res;
+ });
}
void ClusterCursorManager::killAllCursors(OperationContext* opCtx) {
- const auto now = _clockSource->now();
- stdx::unique_lock<Latch> lk(_mutex);
- auto pred = [](CursorId, const CursorEntry&) -> bool { return true; };
-
- killCursorsSatisfying(std::move(lk), opCtx, std::move(pred), now);
+ killCursorsSatisfying(opCtx, [](CursorId, const CursorEntry&) -> bool { return true; });
}
std::size_t ClusterCursorManager::killCursorsSatisfying(
- stdx::unique_lock<Latch> lk,
- OperationContext* opCtx,
- std::function<bool(CursorId, const CursorEntry&)> pred,
- Date_t now) {
+ OperationContext* opCtx, const std::function<bool(CursorId, const CursorEntry&)>& pred) {
invariant(opCtx);
- invariant(lk.owns_lock());
+ const auto now = _clockSource->now();
+ stdx::unique_lock<Latch> lk(_mutex);
std::size_t nKilled = 0;
_log.push(
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index 0b1055e1b9c..a10db367159 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -216,6 +216,151 @@ public:
};
/**
+ * CursorEntry is a movable, non-copyable container for a single cursor.
+ */
+ class CursorEntry {
+ public:
+ CursorEntry() = default;
+
+ CursorEntry(std::unique_ptr<ClusterClientCursor> cursor,
+ CursorType cursorType,
+ CursorLifetime cursorLifetime,
+ Date_t lastActive,
+ UserNameIterator authenticatedUsersIter,
+ UUID clientUUID,
+ boost::optional<OperationKey> opKey)
+ : _cursor(std::move(cursor)),
+ _cursorType(cursorType),
+ _cursorLifetime(cursorLifetime),
+ _lastActive(lastActive),
+ _lsid(_cursor->getLsid()),
+ _opKey(std::move(opKey)),
+ _originatingClient(std::move(clientUUID)),
+ _authenticatedUsers(
+ userNameIteratorToContainer<std::vector<UserName>>(authenticatedUsersIter)) {
+ invariant(_cursor);
+ }
+
+ CursorEntry(const CursorEntry&) = delete;
+ CursorEntry& operator=(const CursorEntry&) = delete;
+
+ CursorEntry(CursorEntry&& other) = default;
+ CursorEntry& operator=(CursorEntry&& other) = default;
+
+ bool isKillPending() const {
+ // A cursor is kill pending if it's checked out by an OperationContext that was
+ // interrupted.
+ if (!_operationUsingCursor) {
+ return false;
+ }
+
+ // Must hold the Client lock when calling isKillPending().
+ stdx::unique_lock<Client> lk(*_operationUsingCursor->getClient());
+ return _operationUsingCursor->isKillPending();
+ }
+
+ CursorType getCursorType() const {
+ return _cursorType;
+ }
+
+ CursorLifetime getLifetimeType() const {
+ return _cursorLifetime;
+ }
+
+ Date_t getLastActive() const {
+ return _lastActive;
+ }
+
+ boost::optional<LogicalSessionId> getLsid() const {
+ return _lsid;
+ }
+
+ boost::optional<OperationKey> getOperationKey() const {
+ return _opKey;
+ }
+
+ /**
+ * Returns a cursor guard holding the cursor owned by this CursorEntry for an operation to
+ * use. Only one operation may use the cursor at a time, so callers should check that
+ * getOperationUsingCursor() returns null before using this function. Callers may not pass
+ * nullptr for opCtx. Ownership of the cursor is given to the returned
+ * ClusterClientCursorGuard; callers that want to assume ownership over the cursor directly
+ * must unpack the cursor from the returned guard.
+ */
+ ClusterClientCursorGuard releaseCursor(OperationContext* opCtx) {
+ invariant(!_operationUsingCursor);
+ invariant(_cursor);
+ invariant(opCtx);
+ _operationUsingCursor = opCtx;
+ return ClusterClientCursorGuard(opCtx, std::move(_cursor));
+ }
+
+ /**
+ * Creates a generic cursor from the cursor inside this entry. Should only be called on
+ * idle cursors. The caller must supply the cursorId and namespace because the CursorEntry
+ * does not have access to them. Cannot be called if this CursorEntry does not own an
+ * underlying ClusterClientCursor.
+ */
+ GenericCursor cursorToGenericCursor(CursorId cursorId, const NamespaceString& ns) const;
+
+ OperationContext* getOperationUsingCursor() const {
+ return _operationUsingCursor;
+ }
+
+ /**
+ * Indicate that the cursor is no longer in use by an operation. Once this is called,
+ * another operation may check the cursor out.
+ */
+ void returnCursor(std::unique_ptr<ClusterClientCursor> cursor) {
+ invariant(cursor);
+ invariant(!_cursor);
+ invariant(_operationUsingCursor);
+
+ _cursor = std::move(cursor);
+ _operationUsingCursor = nullptr;
+ }
+
+ void setLastActive(Date_t lastActive) {
+ _lastActive = lastActive;
+ }
+
+ UserNameIterator getAuthenticatedUsers() const {
+ return makeUserNameIterator(_authenticatedUsers.begin(), _authenticatedUsers.end());
+ }
+
+ const UUID& originatingClientUuid() const {
+ return _originatingClient;
+ }
+
+ private:
+ std::unique_ptr<ClusterClientCursor> _cursor;
+ CursorType _cursorType = CursorType::SingleTarget;
+ CursorLifetime _cursorLifetime = CursorLifetime::Mortal;
+ Date_t _lastActive;
+ boost::optional<LogicalSessionId> _lsid;
+
+ /**
+ * The client OperationKey from the OperationContext at the time of registering a cursor.
+ */
+ boost::optional<OperationKey> _opKey;
+
+ /**
+ * Current operation using the cursor. Non-null if the cursor is checked out.
+ */
+ OperationContext* _operationUsingCursor = nullptr;
+
+ /**
+ * The UUID of the Client that opened the cursor.
+ */
+ UUID _originatingClient;
+
+ /**
+ * The set of users authorized to use this cursor.
+ */
+ std::vector<UserName> _authenticatedUsers;
+ };
+
+ /**
* Constructs an empty manager.
*
* Does not take ownership of 'clockSource'. 'clockSource' must refer to a non-null clock
@@ -315,6 +460,12 @@ public:
Status killCursor(OperationContext* opCtx, const NamespaceString& nss, CursorId cursorId);
/**
+ * Kill the cursors satisfying the given predicate. Returns the number of cursors killed.
+ */
+ std::size_t killCursorsSatisfying(
+ OperationContext* opCtx, const std::function<bool(CursorId, const CursorEntry&)>& pred);
+
+ /**
* Informs the manager that all mortal cursors with a 'last active' time equal to or earlier
* than 'cutoff' should be killed. The cursors need not necessarily be in the 'idle' state.
*
@@ -333,7 +484,6 @@ public:
*/
void killAllCursors(OperationContext* opCtx);
-
/**
* Returns the number of open cursors on a ClusterCursorManager, broken down by type.
*
@@ -387,7 +537,6 @@ public:
}
private:
- class CursorEntry;
struct CursorEntryContainer;
using CursorEntryMap = stdx::unordered_map<CursorId, CursorEntry>;
using NssToCursorContainerMap = stdx::unordered_map<NamespaceString, CursorEntryContainer>;
@@ -512,145 +661,6 @@ private:
void killOperationUsingCursor(WithLock, CursorEntry* entry);
/**
- * Kill the cursors satisfying the given predicate. Assumes that 'lk' is held upon entry. The
- * 'now' parameter is only used for the internal logging mechansim.
- *
- * Returns the number of cursors killed.
- */
- std::size_t killCursorsSatisfying(stdx::unique_lock<Latch> lk,
- OperationContext* opCtx,
- std::function<bool(CursorId, const CursorEntry&)> pred,
- Date_t now);
-
- /**
- * CursorEntry is a moveable, non-copyable container for a single cursor.
- */
- class CursorEntry {
- CursorEntry(const CursorEntry&) = delete;
- CursorEntry& operator=(const CursorEntry&) = delete;
-
- public:
- CursorEntry() = default;
-
- CursorEntry(std::unique_ptr<ClusterClientCursor> cursor,
- CursorType cursorType,
- CursorLifetime cursorLifetime,
- Date_t lastActive,
- UserNameIterator authenticatedUsersIter,
- boost::optional<OperationKey> opKey)
- : _cursor(std::move(cursor)),
- _cursorType(cursorType),
- _cursorLifetime(cursorLifetime),
- _lastActive(lastActive),
- _lsid(_cursor->getLsid()),
- _opKey(std::move(opKey)),
- _authenticatedUsers(
- userNameIteratorToContainer<std::vector<UserName>>(authenticatedUsersIter)) {
- invariant(_cursor);
- }
-
- CursorEntry(CursorEntry&& other) = default;
- CursorEntry& operator=(CursorEntry&& other) = default;
-
- bool isKillPending() const {
- // A cursor is kill pending if it's checked out by an OperationContext that was
- // interrupted.
- if (!_operationUsingCursor) {
- return false;
- }
-
- // Must hold the Client lock when calling isKillPending().
- stdx::unique_lock<Client> lk(*_operationUsingCursor->getClient());
- return _operationUsingCursor->isKillPending();
- }
-
- CursorType getCursorType() const {
- return _cursorType;
- }
-
- CursorLifetime getLifetimeType() const {
- return _cursorLifetime;
- }
-
- Date_t getLastActive() const {
- return _lastActive;
- }
-
- boost::optional<LogicalSessionId> getLsid() const {
- return _lsid;
- }
-
- boost::optional<OperationKey> getOperationKey() const {
- return _opKey;
- }
-
- /**
- * Returns a cursor guard holding the cursor owned by this CursorEntry for an operation to
- * use. Only one operation may use the cursor at a time, so callers should check that
- * getOperationUsingCursor() returns null before using this function. Callers may not pass
- * nullptr for opCtx. Ownership of the cursor is given to the returned
- * ClusterClientCursorGuard; callers that want to assume ownership over the cursor directly
- * must unpack the cursor from the returned guard.
- */
- ClusterClientCursorGuard releaseCursor(OperationContext* opCtx) {
- invariant(!_operationUsingCursor);
- invariant(_cursor);
- invariant(opCtx);
- _operationUsingCursor = opCtx;
- return ClusterClientCursorGuard(opCtx, std::move(_cursor));
- }
-
- /**
- * Creates a generic cursor from the cursor inside this entry. Should only be called on
- * idle cursors. The caller must supply the cursorId and namespace because the CursorEntry
- * does not have access to them. Cannot be called if this CursorEntry does not own an
- * underlying ClusterClientCursor.
- */
- GenericCursor cursorToGenericCursor(CursorId cursorId, const NamespaceString& ns) const;
-
- OperationContext* getOperationUsingCursor() const {
- return _operationUsingCursor;
- }
-
- /**
- * Indicate that the cursor is no longer in use by an operation. Once this is called,
- * another operation may check the cursor out.
- */
- void returnCursor(std::unique_ptr<ClusterClientCursor> cursor) {
- invariant(cursor);
- invariant(!_cursor);
- invariant(_operationUsingCursor);
-
- _cursor = std::move(cursor);
- _operationUsingCursor = nullptr;
- }
-
- void setLastActive(Date_t lastActive) {
- _lastActive = lastActive;
- }
-
- UserNameIterator getAuthenticatedUsers() const {
- return makeUserNameIterator(_authenticatedUsers.begin(), _authenticatedUsers.end());
- }
-
- private:
- std::unique_ptr<ClusterClientCursor> _cursor;
- CursorType _cursorType = CursorType::SingleTarget;
- CursorLifetime _cursorLifetime = CursorLifetime::Mortal;
- Date_t _lastActive;
- boost::optional<LogicalSessionId> _lsid;
-
- // The client OperationKey from the OperationContext at the time of registering a cursor.
- boost::optional<OperationKey> _opKey;
-
- // Current operation using the cursor. Non-null if the cursor is checked out.
- OperationContext* _operationUsingCursor = nullptr;
-
- // The set of users authorized to use this cursor.
- const std::vector<UserName> _authenticatedUsers;
- };
-
- /**
* CursorEntryContainer is a moveable, non-copyable container for a set of cursors, where all
* contained cursors share the same 32-bit prefix of their cursor id.
*/
diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp
index ef33718577c..21a00bce57b 100644
--- a/src/mongo/s/query/cluster_cursor_manager_test.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp
@@ -48,7 +48,8 @@ const NamespaceString nss("test.collection");
class ClusterCursorManagerTest : public ServiceContextTest {
protected:
- ClusterCursorManagerTest() : _opCtx(makeOperationContext()), _manager(&_clockSourceMock) {
+ ClusterCursorManagerTest() {
+ _opCtx = makeOperationContext();
LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
}
@@ -56,8 +57,6 @@ protected:
_manager.shutdown(_opCtx.get());
}
- ServiceContext::UniqueOperationContext _opCtx;
-
static Status successAuthChecker(UserNameIterator userNames) {
return Status::OK();
};
@@ -81,6 +80,13 @@ protected:
}
/**
+ * Returns an unowned pointer to the OperationContext owned by this test fixture.
+ */
+ OperationContext* getOperationContext() {
+ return _opCtx.get();
+ }
+
+ /**
* Allocates a mock cursor, which can be used with the 'isMockCursorKilled' method below.
*/
std::unique_ptr<ClusterClientCursorMock> allocateMockCursor(
@@ -123,10 +129,9 @@ private:
// We use std::list<> for this member (instead of std::vector<>, for example) so that we can
// keep references that don't get invalidated as the list grows.
std::list<bool> _cursorKilledFlags;
-
ClockSourceMock _clockSourceMock;
-
- ClusterCursorManager _manager;
+ ClusterCursorManager _manager{&_clockSourceMock};
+ ServiceContext::UniqueOperationContext _opCtx;
};
// Test that registering a cursor and checking it out returns a pin to the same cursor.
@@ -134,14 +139,14 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) {
auto cursor = allocateMockCursor();
cursor->queueResult(BSON("a" << 1));
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
std::move(cursor),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
auto nextResult = pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind);
ASSERT_OK(nextResult.getStatus());
@@ -155,7 +160,7 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) {
// Test that registering a cursor returns a non-zero cursor id.
TEST_F(ClusterCursorManagerTest, RegisterCursorReturnsNonZeroId) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -169,14 +174,14 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) {
auto cursor = allocateMockCursor();
cursor->queueResult(BSON("a" << 1));
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
std::move(cursor),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto checkedOutCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(checkedOutCursor.getStatus());
ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId());
auto nextResult = checkedOutCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind);
@@ -197,7 +202,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
auto cursor = allocateMockCursor();
cursor->queueResult(BSON("a" << i));
cursorIds[i] =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
std::move(cursor),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -205,8 +210,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
UserNameIterator()));
}
for (int i = 0; i < numCursors; ++i) {
- auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorIds[i], _opCtx.get(), successAuthChecker);
+ auto pinnedCursor = getManager()->checkOutCursor(
+ nss, cursorIds[i], getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
auto nextResult = pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind);
ASSERT_OK(nextResult.getStatus());
@@ -221,33 +226,35 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
// Test that checking out a pinned cursor returns an error with code ErrorCodes::CursorInUse.
TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_EQ(
- ErrorCodes::CursorInUse,
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus());
+ ASSERT_EQ(ErrorCodes::CursorInUse,
+ getManager()
+ ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker)
+ .getStatus());
}
// Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound.
TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
killCursorFromDifferentOpCtx(nss, cursorId);
- ASSERT_EQ(
- ErrorCodes::CursorNotFound,
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus());
+ ASSERT_EQ(ErrorCodes::CursorNotFound,
+ getManager()
+ ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker)
+ .getStatus());
}
// Test that checking out an unknown cursor returns an error with code ErrorCodes::CursorNotFound.
@@ -262,7 +269,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) {
const NamespaceString correctNamespace("test.correct");
const NamespaceString incorrectNamespace("test.incorrect");
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
correctNamespace,
ClusterCursorManager::CursorType::SingleTarget,
@@ -278,7 +285,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) {
// even if there is an existing cursor with the same namespace but a different cursor id.
TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -286,7 +293,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) {
UserNameIterator()));
ASSERT_EQ(ErrorCodes::CursorNotFound,
getManager()
- ->checkOutCursor(nss, cursorId + 1, _opCtx.get(), successAuthChecker)
+ ->checkOutCursor(nss, cursorId + 1, getOperationContext(), successAuthChecker)
.getStatus());
}
@@ -294,7 +301,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) {
// current time.
TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -303,23 +310,23 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) {
Date_t cursorRegistrationTime = getClockSource()->now();
getClockSource()->advance(Milliseconds(1));
auto checkedOutCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(checkedOutCursor.getStatus());
checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
- getManager()->killMortalCursorsInactiveSince(_opCtx.get(), cursorRegistrationTime);
+ getManager()->killMortalCursorsInactiveSince(getOperationContext(), cursorRegistrationTime);
ASSERT(!isMockCursorKilled(0));
}
TEST_F(ClusterCursorManagerTest, CheckOutCursorAuthFails) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto checkedOutCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), failAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), failAuthChecker);
ASSERT_EQ(checkedOutCursor.getStatus(), ErrorCodes::Unauthorized);
}
@@ -328,7 +335,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorAuthFails) {
// current time.
TEST_F(ClusterCursorManagerTest, ReturnCursorUpdateActiveTime) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -336,18 +343,18 @@ TEST_F(ClusterCursorManagerTest, ReturnCursorUpdateActiveTime) {
UserNameIterator()));
Date_t cursorCheckOutTime = getClockSource()->now();
auto checkedOutCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(checkedOutCursor.getStatus());
getClockSource()->advance(Milliseconds(1));
checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
- getManager()->killMortalCursorsInactiveSince(_opCtx.get(), cursorCheckOutTime);
+ getManager()->killMortalCursorsInactiveSince(getOperationContext(), cursorCheckOutTime);
ASSERT(!isMockCursorKilled(0));
}
// Test that killing a pinned cursor by id successfully kills the cursor.
TEST_F(ClusterCursorManagerTest, KillUnpinnedCursorBasic) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -360,19 +367,19 @@ TEST_F(ClusterCursorManagerTest, KillUnpinnedCursorBasic) {
// Test that killing a pinned cursor by id successfully kills the cursor.
TEST_F(ClusterCursorManagerTest, KillPinnedCursorBasic) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
killCursorFromDifferentOpCtx(nss, pinnedCursor.getValue().getCursorId());
// When the cursor is pinned the operation which checked out the cursor should be interrupted.
- ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
+ ASSERT_EQ(getOperationContext()->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
ASSERT(!isMockCursorKilled(0));
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
@@ -387,7 +394,7 @@ TEST_F(ClusterCursorManagerTest, KillCursorMultipleCursors) {
// Register cursors and populate 'cursorIds' with the returned cursor ids.
for (size_t i = 0; i < numCursors; ++i) {
cursorIds[i] =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -396,14 +403,14 @@ TEST_F(ClusterCursorManagerTest, KillCursorMultipleCursors) {
}
// Kill each cursor and verify that it was successfully killed.
for (size_t i = 0; i < numCursors; ++i) {
- ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorIds[i]));
+ ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorIds[i]));
ASSERT(isMockCursorKilled(i));
}
}
// Test that killing an unknown cursor returns an error with code ErrorCodes::CursorNotFound.
TEST_F(ClusterCursorManagerTest, KillCursorUnknown) {
- Status killResult = getManager()->killCursor(_opCtx.get(), nss, 5);
+ Status killResult = getManager()->killCursor(getOperationContext(), nss, 5);
ASSERT_EQ(ErrorCodes::CursorNotFound, killResult);
}
@@ -413,13 +420,14 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) {
const NamespaceString correctNamespace("test.correct");
const NamespaceString incorrectNamespace("test.incorrect");
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
correctNamespace,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
- Status killResult = getManager()->killCursor(_opCtx.get(), incorrectNamespace, cursorId);
+ Status killResult =
+ getManager()->killCursor(getOperationContext(), incorrectNamespace, cursorId);
ASSERT_EQ(ErrorCodes::CursorNotFound, killResult);
}
@@ -427,25 +435,25 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) {
// even if there is an existing cursor with the same namespace but a different cursor id.
TEST_F(ClusterCursorManagerTest, KillCursorWrongCursorId) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
- Status killResult = getManager()->killCursor(_opCtx.get(), nss, cursorId + 1);
+ Status killResult = getManager()->killCursor(getOperationContext(), nss, cursorId + 1);
ASSERT_EQ(ErrorCodes::CursorNotFound, killResult);
}
// Test that killing all mortal expired cursors correctly kills a mortal expired cursor.
TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) {
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
- getManager()->killMortalCursorsInactiveSince(_opCtx.get(), getClockSource()->now());
+ getManager()->killMortalCursorsInactiveSince(getOperationContext(), getClockSource()->now());
ASSERT(isMockCursorKilled(0));
}
@@ -453,25 +461,25 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) {
TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipUnexpired) {
Date_t timeBeforeCursorCreation = getClockSource()->now();
getClockSource()->advance(Milliseconds(1));
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
- getManager()->killMortalCursorsInactiveSince(_opCtx.get(), timeBeforeCursorCreation);
+ getManager()->killMortalCursorsInactiveSince(getOperationContext(), timeBeforeCursorCreation);
ASSERT(!isMockCursorKilled(0));
}
// Test that killing all mortal expired cursors does not kill a cursor that is immortal.
TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipImmortal) {
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Immortal,
UserNameIterator()));
- getManager()->killMortalCursorsInactiveSince(_opCtx.get(), getClockSource()->now());
+ getManager()->killMortalCursorsInactiveSince(getOperationContext(), getClockSource()->now());
ASSERT(!isMockCursorKilled(0));
}
@@ -479,18 +487,18 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipImmortal) {
// pinned.
TEST_F(ClusterCursorManagerTest, ShouldNotKillPinnedCursors) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
- auto pin =
- assertGet(getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker));
- getManager()->killMortalCursorsInactiveSince(_opCtx.get(), getClockSource()->now());
+ auto pin = assertGet(
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker));
+ getManager()->killMortalCursorsInactiveSince(getOperationContext(), getClockSource()->now());
ASSERT(!isMockCursorKilled(0));
pin.returnCursor(ClusterCursorManager::CursorState::NotExhausted);
- getManager()->killMortalCursorsInactiveSince(_opCtx.get(), getClockSource()->now());
+ getManager()->killMortalCursorsInactiveSince(getOperationContext(), getClockSource()->now());
ASSERT(isMockCursorKilled(0));
}
@@ -504,7 +512,7 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors)
if (i < numKilledCursorsExpected) {
cutoff = getClockSource()->now();
}
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -512,7 +520,7 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors)
UserNameIterator()));
getClockSource()->advance(Milliseconds(1));
}
- getManager()->killMortalCursorsInactiveSince(_opCtx.get(), cutoff);
+ getManager()->killMortalCursorsInactiveSince(getOperationContext(), cutoff);
for (size_t i = 0; i < numCursors; ++i) {
if (i < numKilledCursorsExpected) {
ASSERT(isMockCursorKilled(i));
@@ -526,19 +534,121 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors)
TEST_F(ClusterCursorManagerTest, KillAllCursors) {
const size_t numCursors = 10;
for (size_t i = 0; i < numCursors; ++i) {
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal,
+ UserNameIterator()));
+ }
+ getManager()->killAllCursors(getOperationContext());
+ for (size_t i = 0; i < numCursors; ++i) {
+ ASSERT(isMockCursorKilled(i));
+ }
+}
+
+TEST_F(ClusterCursorManagerTest, KillCursorsSatisfyingAlwaysTrueKillsAllCursors) {
+ const size_t numCursors = 10;
+ for (size_t i = 0; i < numCursors; ++i) {
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
}
- getManager()->killAllCursors(_opCtx.get());
+ auto pred = [](CursorId, const ClusterCursorManager::CursorEntry&) { return true; };
+ auto nKilled = getManager()->killCursorsSatisfying(getOperationContext(), std::move(pred));
+ ASSERT_EQ(nKilled, numCursors);
for (size_t i = 0; i < numCursors; ++i) {
ASSERT(isMockCursorKilled(i));
}
}
+TEST_F(ClusterCursorManagerTest, KillCursorsSatisfyingAlwaysFalseKillsNoCursors) {
+ const size_t numCursors = 10;
+ for (size_t i = 0; i < numCursors; ++i) {
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal,
+ UserNameIterator()));
+ }
+ auto pred = [](CursorId, const ClusterCursorManager::CursorEntry&) { return false; };
+ auto nKilled = getManager()->killCursorsSatisfying(getOperationContext(), std::move(pred));
+ ASSERT_EQ(nKilled, 0);
+ for (size_t i = 0; i < numCursors; ++i) {
+ ASSERT(!isMockCursorKilled(i));
+ }
+}
+
+TEST_F(ClusterCursorManagerTest, KillCursorsSatisfyingOnlyKillsMatchingSubset) {
+ const size_t numCursors = 10;
+ stdx::unordered_set<CursorId> idsToKill;
+ auto shouldKillCursor = [](size_t idx) { return idx % 2 == 0; };
+ for (size_t i = 0; i < numCursors; ++i) {
+ auto swCursorId =
+ getManager()->registerCursor(getOperationContext(),
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal,
+ UserNameIterator());
+ ASSERT_OK(swCursorId);
+ if (shouldKillCursor(i))
+ idsToKill.insert(swCursorId.getValue());
+ }
+ auto pred = [&](CursorId id, const ClusterCursorManager::CursorEntry&) {
+ return idsToKill.contains(id);
+ };
+ auto nKilled = getManager()->killCursorsSatisfying(getOperationContext(), std::move(pred));
+ ASSERT_EQ(nKilled, numCursors / 2);
+ for (size_t i = 0; i < numCursors; ++i) {
+ if (shouldKillCursor(i))
+ ASSERT(isMockCursorKilled(i));
+ else
+ ASSERT(!isMockCursorKilled(i));
+ }
+}
+
+// Test that the Client that registered a cursor is correctly recorded.
+TEST_F(ClusterCursorManagerTest, CorrectlyRecordsOriginatingClient) {
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::MultiTarget,
+ ClusterCursorManager::CursorLifetime::Mortal,
+ UserNameIterator()));
+ // Now insert some cursors under a different client.
+ const size_t numAltClientCursors = 10;
+ {
+ auto otherClient = getServiceContext()->makeClient("otherClient");
+ auto otherOpCtx = otherClient->makeOperationContext();
+ AlternativeClientRegion acr(otherClient);
+ for (size_t i = 0; i < numAltClientCursors; ++i) {
+ ASSERT_OK(getManager()->registerCursor(otherOpCtx.get(),
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::MultiTarget,
+ ClusterCursorManager::CursorLifetime::Mortal,
+ UserNameIterator()));
+ }
+ }
+
+ auto pred = [client = getClient()](CursorId id,
+ const ClusterCursorManager::CursorEntry& entry) {
+ return entry.originatingClientUuid() == client->getUUID();
+ };
+ // Ensure we only matched the initial client, not the alternate one.
+ auto nKilled = getManager()->killCursorsSatisfying(getOperationContext(), std::move(pred));
+ ASSERT_EQ(nKilled, 1);
+ ASSERT(isMockCursorKilled(0));
+ for (size_t i = 1; i < numAltClientCursors + 1; ++i) {
+ ASSERT(!isMockCursorKilled(i));
+ }
+}
+
// Test that a new ClusterCursorManager's stats() is initially zero for the cursor counts.
TEST_F(ClusterCursorManagerTest, StatsInitAsZero) {
ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget);
@@ -548,7 +658,7 @@ TEST_F(ClusterCursorManagerTest, StatsInitAsZero) {
// Test that registering a sharded cursor updates the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) {
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::MultiTarget,
@@ -559,7 +669,7 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) {
// Test that registering a not-sharded cursor updates the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) {
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -571,14 +681,14 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) {
// Test that checking out a cursor updates the pinned counter in stats().
TEST_F(ClusterCursorManagerTest, StatsPinCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
}
@@ -587,7 +697,7 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) {
TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) {
const size_t numShardedCursors = 10;
for (size_t i = 0; i < numShardedCursors; ++i) {
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::MultiTarget,
@@ -598,7 +708,7 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) {
}
const size_t numNotShardedCursors = 10;
for (size_t i = 0; i < numNotShardedCursors; ++i) {
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -612,61 +722,61 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) {
// Test that killing a sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsKillShardedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
ASSERT_EQ(1U, getManager()->stats().cursorsMultiTarget);
- ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId));
+ ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId));
ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget);
}
// Test that killing a not-sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsKillNotShardedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
ASSERT_EQ(1U, getManager()->stats().cursorsSingleTarget);
- ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId));
+ ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId));
ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget);
}
// Test that killing a pinned cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
killCursorFromDifferentOpCtx(nss, cursorId);
- ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
+ ASSERT_EQ(getOperationContext()->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
ASSERT_EQ(0U, getManager()->stats().cursorsPinned);
}
// Test that exhausting a sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(
pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus());
@@ -678,14 +788,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) {
// Test that exhausting a not-sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(
pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus());
@@ -698,14 +808,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) {
// stats().
TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(
pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus());
@@ -718,14 +828,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) {
// stats().
TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(
pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus());
@@ -737,7 +847,7 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) {
// Test that getting the namespace for a cursor returns the correct namespace.
TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdBasic) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -756,7 +866,7 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsSameNames
std::vector<CursorId> cursorIds(numCursors);
for (size_t i = 0; i < numCursors; ++i) {
cursorIds[i] =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -779,7 +889,7 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsDifferent
for (size_t i = 0; i < numCursors; ++i) {
NamespaceString cursorNamespace(std::string(str::stream() << "test.collection" << i));
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
cursorNamespace,
ClusterCursorManager::CursorType::SingleTarget,
@@ -811,21 +921,21 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDefaultConstructor) {
// cursor.
TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto registeredCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
auto checkedOutCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(checkedOutCursor.getStatus());
}
@@ -833,14 +943,14 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) {
// cursor, and leaves the pin owning no cursor.
TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto registeredCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
@@ -850,8 +960,9 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
// Cursor should have been killed and destroyed.
- ASSERT_NOT_OK(
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus());
+ ASSERT_NOT_OK(getManager()
+ ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker)
+ .getStatus());
ASSERT(isMockCursorKilled(0));
}
@@ -861,14 +972,14 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
auto mockCursor = allocateMockCursor();
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
std::move(mockCursor),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto registeredCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
@@ -879,22 +990,23 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
// Cursor should be killed as soon as it's checked in.
ASSERT(isMockCursorKilled(0));
- ASSERT_NOT_OK(
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus());
+ ASSERT_NOT_OK(getManager()
+ ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker)
+ .getStatus());
}
// Test that the PinnedCursor move assignment operator correctly kills the cursor if it has not yet
// been returned.
TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
pinnedCursor = ClusterCursorManager::PinnedCursor();
ASSERT(isMockCursorKilled(0));
}
@@ -903,14 +1015,14 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) {
TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) {
{
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
}
ASSERT(isMockCursorKilled(0));
}
@@ -921,14 +1033,14 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) {
ASSERT_FALSE(mockCursor->remotesExhausted());
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
std::move(mockCursor),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_FALSE(pinnedCursor.getValue()->remotesExhausted());
}
@@ -937,23 +1049,23 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) {
TEST_F(ClusterCursorManagerTest, DoNotDestroyKilledPinnedCursors) {
const Date_t cutoff = getClockSource()->now();
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
killCursorFromDifferentOpCtx(nss, cursorId);
- ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
+ ASSERT_EQ(getOperationContext()->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
ASSERT(!isMockCursorKilled(0));
// The cursor cleanup system should not destroy the cursor either.
- getManager()->killMortalCursorsInactiveSince(_opCtx.get(), cutoff);
+ getManager()->killMortalCursorsInactiveSince(getOperationContext(), cutoff);
// The cursor's operation context should be marked as interrupted, but the cursor itself should
// not have been destroyed.
@@ -975,14 +1087,14 @@ TEST_F(ClusterCursorManagerTest, CursorStoresAPIParameters) {
cursor->setAPIParameters(apiParams);
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
std::move(cursor),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
- auto pinnedCursor =
- assertGet(getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker));
+ auto pinnedCursor = assertGet(
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker));
auto storedAPIParams = pinnedCursor->getAPIParameters();
ASSERT_EQ("2", *storedAPIParams.getAPIVersion());
@@ -991,7 +1103,7 @@ TEST_F(ClusterCursorManagerTest, CursorStoresAPIParameters) {
}
TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) {
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -999,12 +1111,12 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) {
UserNameIterator()));
ASSERT(!isMockCursorKilled(0));
- getManager()->shutdown(_opCtx.get());
+ getManager()->shutdown(getOperationContext());
ASSERT(isMockCursorKilled(0));
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress,
- getManager()->registerCursor(_opCtx.get(),
+ getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1014,7 +1126,7 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) {
TEST_F(ClusterCursorManagerTest, PinnedCursorNotKilledOnShutdown) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1022,10 +1134,10 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorNotKilledOnShutdown) {
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
- getManager()->shutdown(_opCtx.get());
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
+ getManager()->shutdown(getOperationContext());
- ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
+ ASSERT_EQ(getOperationContext()->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
ASSERT(!isMockCursorKilled(0));
// Even if it's checked back in as not exhausted, it should have been marked as killed when
@@ -1036,7 +1148,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorNotKilledOnShutdown) {
TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1044,13 +1156,14 @@ TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) {
UserNameIterator()));
ASSERT(!isMockCursorKilled(0));
- getManager()->shutdown(_opCtx.get());
+ getManager()->shutdown(getOperationContext());
ASSERT(isMockCursorKilled(0));
- ASSERT_EQUALS(
- ErrorCodes::ShutdownInProgress,
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker).getStatus());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress,
+ getManager()
+ ->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker)
+ .getStatus());
}
/**
@@ -1058,7 +1171,7 @@ TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) {
*/
TEST_F(ClusterCursorManagerTest, CursorsWithoutSessions) {
// Add a cursor with no session to the cursor manager.
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1078,7 +1191,7 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithASession) {
// Add a cursor with a session to the cursor manager.
auto lsid = makeLogicalSessionIdForTest();
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(lsid),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1097,7 +1210,7 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithASession) {
ASSERT(cursors.find(cursorId) != cursors.end());
// Remove the cursor from the manager.
- ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId));
+ ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId));
// There should be no more cursor entries by session id.
LogicalSessionIdSet sessions;
@@ -1113,7 +1226,7 @@ TEST_F(ClusterCursorManagerTest, GetSessionIdsWhileCheckedOut) {
// Add a cursor with a session to the cursor manager.
auto lsid = makeLogicalSessionIdForTest();
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(lsid),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1121,7 +1234,8 @@ TEST_F(ClusterCursorManagerTest, GetSessionIdsWhileCheckedOut) {
UserNameIterator()));
// Check the cursor out, then try to append cursors, see that we get one.
- auto res = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ auto res =
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT(res.isOK());
auto cursors = getManager()->getCursorsForSession(lsid);
@@ -1135,14 +1249,14 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameSession) {
// Add two cursors on the same session to the cursor manager.
auto lsid = makeLogicalSessionIdForTest();
auto cursorId1 =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(lsid),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto cursorId2 =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(lsid),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1162,7 +1276,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameSession) {
ASSERT(cursors.find(cursorId2) != cursors.end());
// Remove one cursor from the manager.
- ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId1));
+ ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId1));
// Should still be able to retrieve the session.
lsids.clear();
@@ -1185,7 +1299,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleSessions) {
// Register two cursors with different lsids, and one without.
CursorId cursor1 =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(lsid1),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1193,14 +1307,14 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleSessions) {
UserNameIterator()));
CursorId cursor2 =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(lsid2),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1231,7 +1345,7 @@ TEST_F(ClusterCursorManagerTest, ManyCursorsManySessions) {
const int count = 10000;
for (int i = 0; i < count; i++) {
auto lsid = makeLogicalSessionIdForTest();
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(lsid),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1247,26 +1361,27 @@ TEST_F(ClusterCursorManagerTest, ManyCursorsManySessions) {
TEST_F(ClusterCursorManagerTest, CheckAuthForKillCursors) {
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
- ASSERT_EQ(
- ErrorCodes::CursorNotFound,
- getManager()->checkAuthForKillCursors(_opCtx.get(), nss, cursorId + 1, successAuthChecker));
+ ASSERT_EQ(ErrorCodes::CursorNotFound,
+ getManager()->checkAuthForKillCursors(
+ getOperationContext(), nss, cursorId + 1, successAuthChecker));
ASSERT_EQ(ErrorCodes::Unauthorized,
- getManager()->checkAuthForKillCursors(_opCtx.get(), nss, cursorId, failAuthChecker));
- ASSERT_OK(
- getManager()->checkAuthForKillCursors(_opCtx.get(), nss, cursorId, successAuthChecker));
+ getManager()->checkAuthForKillCursors(
+ getOperationContext(), nss, cursorId, failAuthChecker));
+ ASSERT_OK(getManager()->checkAuthForKillCursors(
+ getOperationContext(), nss, cursorId, successAuthChecker));
}
TEST_F(ClusterCursorManagerTest, PinnedCursorReturnsUnderlyingCursorTxnNumber) {
const TxnNumber txnNumber = 5;
auto cursorId = assertGet(
- getManager()->registerCursor(_opCtx.get(),
+ getManager()->registerCursor(getOperationContext(),
allocateMockCursor(makeLogicalSessionIdForTest(), txnNumber),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1274,7 +1389,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnsUnderlyingCursorTxnNumber) {
UserNameIterator()));
auto pinnedCursor =
- getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
// The underlying cursor's txnNumber should be returned.
@@ -1283,7 +1398,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnsUnderlyingCursorTxnNumber) {
}
TEST_F(ClusterCursorManagerTest, CursorsWithoutOperationKeys) {
- ASSERT_OK(getManager()->registerCursor(_opCtx.get(),
+ ASSERT_OK(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1295,9 +1410,9 @@ TEST_F(ClusterCursorManagerTest, CursorsWithoutOperationKeys) {
TEST_F(ClusterCursorManagerTest, OneCursorWithAnOperationKey) {
auto opKey = UUID::gen();
- _opCtx->setOperationKey(opKey);
+ getOperationContext()->setOperationKey(opKey);
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1310,7 +1425,7 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithAnOperationKey) {
ASSERT(cursors.find(cursorId) != cursors.end());
// Remove the cursor from the manager.
- ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId));
+ ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId));
// There should be no more cursor entries for this operation key.
ASSERT(getManager()->getCursorsForOpKeys({opKey}).empty());
@@ -1318,9 +1433,9 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithAnOperationKey) {
TEST_F(ClusterCursorManagerTest, GetCursorByOpKeyWhileCheckedOut) {
auto opKey = UUID::gen();
- _opCtx->setOperationKey(opKey);
+ getOperationContext()->setOperationKey(opKey);
auto cursorId =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1328,7 +1443,8 @@ TEST_F(ClusterCursorManagerTest, GetCursorByOpKeyWhileCheckedOut) {
UserNameIterator()));
// Check the cursor out then look it up by operation key.
- auto res = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
+ auto res =
+ getManager()->checkOutCursor(nss, cursorId, getOperationContext(), successAuthChecker);
ASSERT(res.isOK());
auto cursors = getManager()->getCursorsForOpKeys({opKey});
@@ -1337,16 +1453,16 @@ TEST_F(ClusterCursorManagerTest, GetCursorByOpKeyWhileCheckedOut) {
TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameOperationKey) {
auto opKey = UUID::gen();
- _opCtx->setOperationKey(opKey);
+ getOperationContext()->setOperationKey(opKey);
auto cursorId1 =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
UserNameIterator()));
auto cursorId2 =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
@@ -1360,7 +1476,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameOperationKey) {
ASSERT(cursors.find(cursorId2) != cursors.end());
// Remove one cursor from the manager.
- ASSERT_OK(getManager()->killCursor(_opCtx.get(), nss, cursorId1));
+ ASSERT_OK(getManager()->killCursor(getOperationContext(), nss, cursorId1));
// Should still be able to retrieve remaining cursor by session.
cursors = getManager()->getCursorsForOpKeys({opKey});
@@ -1371,7 +1487,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameOperationKey) {
TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleOperationKeys) {
auto opKey1 = UUID::gen();
auto opKey2 = UUID::gen();
- _opCtx->setOperationKey(opKey1);
+ getOperationContext()->setOperationKey(opKey1);
auto client2 = getServiceContext()->makeClient("client2");
auto opCtx2 = client2->makeOperationContext();
@@ -1379,7 +1495,7 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleOperationKeys) {
// Register two cursors with different operation keys.
CursorId cursor1 =
- assertGet(getManager()->registerCursor(_opCtx.get(),
+ assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp
index c9ec249b0f7..7e07d31c688 100644
--- a/src/mongo/s/service_entry_point_mongos.cpp
+++ b/src/mongo/s/service_entry_point_mongos.cpp
@@ -49,6 +49,9 @@
#include "mongo/rpc/warn_deprecated_wire_ops.h"
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/commands/strategy.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/load_balancer_support.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
namespace mongo {
@@ -288,4 +291,14 @@ Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCt
return hr->run();
}
+void ServiceEntryPointMongos::onClientDisconnect(Client* client) {
+ if (load_balancer_support::isFromLoadBalancer(client)) {
+ auto ccm = Grid::get(client->getServiceContext())->getCursorManager();
+ auto killerOperationContext = client->makeOperationContext();
+ ccm->killCursorsSatisfying(killerOperationContext.get(),
+ [&](CursorId, const ClusterCursorManager::CursorEntry& entry) {
+ return entry.originatingClientUuid() == client->getUUID();
+ });
+ }
+}
} // namespace mongo
diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h
index 17d99fa3a01..444f89eef4c 100644
--- a/src/mongo/s/service_entry_point_mongos.h
+++ b/src/mongo/s/service_entry_point_mongos.h
@@ -46,6 +46,8 @@ public:
using ServiceEntryPointImpl::ServiceEntryPointImpl;
Future<DbResponse> handleRequest(OperationContext* opCtx,
const Message& request) noexcept override;
+
+ void onClientDisconnect(Client* client) override;
};
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index 3e8d279b7b9..44786921474 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -32,6 +32,7 @@
#include <limits>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/transport/session.h"
#include "mongo/util/future.h"
@@ -102,6 +103,13 @@ public:
*/
virtual void onEndSession(const transport::SessionHandle&) {}
+ /**
+ * Optional handler which is invoked after a client disconnect. A client disconnect occurs when
+ * the connection between the mongo process and client is closed for any reason, and is defined
+ * by the destruction and cleanup of the ServiceStateMachine that manages the client.
+ */
+ virtual void onClientDisconnect(Client* client) {}
+
protected:
ServiceEntryPoint() = default;
};
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 94616fed5c3..492ede06f9f 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -644,9 +644,10 @@ void ServiceStateMachine::Impl::cleanupSession(const Status& status) {
LOGV2_DEBUG(5127900, 2, "Ending session", "error"_attr = status);
cleanupExhaustResources();
+ auto client = _clientStrand->getClientPointer();
+ _sep->onClientDisconnect(client);
{
- auto client = _clientStrand->getClientPointer();
stdx::lock_guard lk(*client);
transport::ServiceExecutorContext::reset(client);
}
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index cae5c81e41d..88485924b76 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -258,6 +258,10 @@ public:
return _data->isConnected.load();
}
+ int onClientDisconnectCalledTimes() const {
+ return _onClientDisconnectCalled;
+ }
+
friend constexpr StringData toString(FailureCondition fail) {
switch (fail) {
case FailureCondition::kNone:
@@ -452,6 +456,10 @@ private:
_stateQueue.push(IngressState::kEnd);
}
+ void _onClientDisconnect() {
+ ++_onClientDisconnectCalled;
+ }
+
const boost::optional<ServiceExecutor::ThreadingModel> _threadingModel;
boost::optional<ServiceExecutor::ThreadingModel> _originalThreadingModel;
@@ -463,6 +471,8 @@ private:
std::shared_ptr<ServiceStateMachineTest::Session> _session;
SingleProducerSingleConsumerQueue<IngressState> _stateQueue;
+
+ int _onClientDisconnectCalled{0};
};
/**
@@ -552,6 +562,11 @@ public:
_fixture->_cleanup(session);
}
+ void onClientDisconnect(Client* client) override {
+ invariant(client);
+ _fixture->_onClientDisconnect();
+ }
+
private:
ServiceStateMachineTest* const _fixture;
};
@@ -794,6 +809,17 @@ TEST_F(ServiceStateMachineTest, EndBeforeStartSession) {
startSession();
}
+TEST_F(ServiceStateMachineTest, OnClientDisconnectCalledOnCleanup) {
+ initNewSession();
+ startSession();
+ ASSERT_EQ(popIngressState(), IngressState::kSource);
+ ASSERT_EQ(onClientDisconnectCalledTimes(), 0);
+ endSession();
+ ASSERT_EQ(popIngressState(), IngressState::kEnd);
+ joinSession();
+ ASSERT_EQ(onClientDisconnectCalledTimes(), 1);
+}
+
TEST_F(ServiceStateMachineWithDedicatedThreadsTest, DefaultLoop) {
auto runner = StepRunner(this);