diff options
-rw-r--r-- | jstests/noPassthrough/agg_cursor_timeout.js | 142 | ||||
-rw-r--r-- | jstests/sharding/cursor_timeout.js | 51 | ||||
-rw-r--r-- | src/mongo/db/cursor_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/query/query_knobs.idl | 7 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.cpp | 10 |
5 files changed, 210 insertions, 4 deletions
diff --git a/jstests/noPassthrough/agg_cursor_timeout.js b/jstests/noPassthrough/agg_cursor_timeout.js new file mode 100644 index 00000000000..b82e1e44384 --- /dev/null +++ b/jstests/noPassthrough/agg_cursor_timeout.js @@ -0,0 +1,142 @@ +/** + * Tests that an aggregation cursor is killed when it is timed out by the ClientCursorMonitor. + * + * This test was designed to reproduce SERVER-25585. + */ +(function() { +'use strict'; + +// Cursor timeout on mongod is handled by a single thread/timer that will sleep for +// "clientCursorMonitorFrequencySecs" and add the sleep value to each operation's duration when +// it wakes up, timing out those whose "now() - last accessed since" time exceeds. A cursor +// timeout of 2 seconds with a monitor frequency of 1 second means an effective timeout period +// of 1 to 2 seconds. +const cursorTimeoutMs = 2000; +const cursorMonitorFrequencySecs = 1; + +const options = { + setParameter: { + internalDocumentSourceCursorBatchSizeBytes: 1, + // We use the "cursorTimeoutMillis" server parameter to decrease how long it takes for a + // non-exhausted cursor to time out. We use the "clientCursorMonitorFrequencySecs" + // server parameter to make the ClientCursorMonitor that cleans up the timed out cursors + // run more often. The combination of these server parameters reduces the amount of time + // we need to wait within this test. + cursorTimeoutMillis: cursorTimeoutMs, + clientCursorMonitorFrequencySecs: cursorMonitorFrequencySecs, + } +}; +const conn = MongoRunner.runMongod(options); +assert.neq(null, conn, 'mongod was unable to start up with options: ' + tojson(options)); + +const testDB = conn.getDB('test'); + +// We use a batch size of 2 to ensure that the mongo shell does not exhaust the cursor on its +// first batch. +const batchSize = 2; +const numMatches = 5; + +function assertCursorTimesOutImpl(collName, pipeline) { + const res = assert.commandWorked(testDB.runCommand({ + aggregate: collName, + pipeline: pipeline, + cursor: { + batchSize: batchSize, + }, + })); + + let serverStatus = assert.commandWorked(testDB.serverStatus()); + const expectedNumTimedOutCursors = serverStatus.metrics.cursor.timedOut + 1; + + const cursor = new DBCommandCursor(testDB, res, batchSize); + + // Wait until the idle cursor background job has killed the aggregation cursor. + assert.soon( + function() { + serverStatus = assert.commandWorked(testDB.serverStatus()); + return +serverStatus.metrics.cursor.timedOut === expectedNumTimedOutCursors; + }, + function() { + return "aggregation cursor failed to time out: " + tojson(serverStatus.metrics.cursor); + }); + + assert.eq(0, serverStatus.metrics.cursor.open.total, tojson(serverStatus)); + + // We attempt to exhaust the aggregation cursor to verify that sending a getMore returns an + // error due to the cursor being killed. + let err = assert.throws(function() { + cursor.itcount(); + }); + assert.eq(ErrorCodes.CursorNotFound, err.code, tojson(err)); +} + +function assertCursorTimesOut(collName, pipeline) { + // Confirm that cursor timeout occurs outside of sessions. + TestData.disableImplicitSessions = true; + assertCursorTimesOutImpl(collName, pipeline); + TestData.disableImplicitSessions = false; + + // Confirm that cursor timeout occurs within sessions when the + // `enableTimeoutOfInactiveSessionCursors` parameter is set to true. If false, we rely on + // session expiration to cleanup outstanding cursors. + assert.commandWorked( + testDB.adminCommand({setParameter: 1, enableTimeoutOfInactiveSessionCursors: true})); + assertCursorTimesOutImpl(collName, pipeline); + assert.commandWorked( + testDB.adminCommand({setParameter: 1, enableTimeoutOfInactiveSessionCursors: false})); +} + +assert.commandWorked(testDB.source.insert({local: 1})); +for (let i = 0; i < numMatches; ++i) { + assert.commandWorked(testDB.dest.insert({foreign: 1})); +} + +// Test that a regular aggregation cursor is killed when the timeout is reached. +assertCursorTimesOut('dest', []); + +// Test that an aggregation cursor with a $lookup stage is killed when the timeout is reached. +assertCursorTimesOut('source', [ + { + $lookup: { + from: 'dest', + localField: 'local', + foreignField: 'foreign', + as: 'matches', + } + }, + { + $unwind: "$matches", + }, + ]); + +// Test that an aggregation cursor with nested $lookup stages is killed when the timeout is +// reached. +assertCursorTimesOut('source', [ + { + $lookup: { + from: 'dest', + let : {local1: "$local"}, + pipeline: [ + {$match: {$expr: {$eq: ["$foreign", "$$local1"]}}}, + { + $lookup: { + from: 'source', + let : {foreign1: "$foreign"}, + pipeline: [{$match: {$expr: {$eq: ["$local", "$$foreign1"]}}}], + as: 'matches2' + } + }, + { + $unwind: "$matches2", + }, + ], + as: 'matches1', + } + }, + { + $unwind: "$matches1", + }, + ]); + +MongoRunner.stopMongod(conn); +})(); diff --git a/jstests/sharding/cursor_timeout.js b/jstests/sharding/cursor_timeout.js index 33ac660703e..eb506a603a2 100644 --- a/jstests/sharding/cursor_timeout.js +++ b/jstests/sharding/cursor_timeout.js @@ -14,7 +14,7 @@ // the session cursors #5 and #6 are attached to to simulate that session timing out, and ensures // that cursors #5 and #6 are killed as a result. // -// @tags: [requires_sharding, requires_find_command, requires_fcv_44] +// @tags: [requires_sharding, requires_find_command, requires_fcv_40] (function() { 'use strict'; @@ -170,5 +170,54 @@ assert.eq(killRes.cursorsUnknown, []); assert.eq(routerColl.count(), routerCursorWithNoTimeout.itcount() + 1); assert.eq(shardColl.count(), shardCursorWithNoTimeout.itcount() + 1); +// Confirm that cursors opened within a session will timeout when the +// 'enableTimeoutOfInactiveSessionCursors' setParameter has been enabled. +(function() { +assert.commandWorked( + mongosDB.adminCommand({setParameter: 1, enableTimeoutOfInactiveSessionCursors: true})); +assert.commandWorked( + shardDB.adminCommand({setParameter: 1, enableTimeoutOfInactiveSessionCursors: true})); + +// Open a session on mongos. +routerSession = mongosDB.getMongo().startSession(); +routerSessionDB = routerSession.getDatabase(mongosDB.getName()); +routerSessionCursor = routerSessionDB.user.find().batchSize(1); +const numRouterCursorsTimedOut = routerColl.getDB().serverStatus().metrics.cursor.timedOut; + +// Open a session on mongod. +shardSession = shardDB.getMongo().startSession(); +shardSessionDB = shardSession.getDatabase(shardDB.getName()); +shardSessionCursor = shardSessionDB.user.find().batchSize(1); +const numShardCursorsTimedOut = routerColl.getDB().serverStatus().metrics.cursor.timedOut; + +// Execute initial find on each cursor. +routerSessionCursor.next(); +shardSessionCursor.next(); + +// Wait until mongos reflects the newly timed out cursors. +assert.soon(function() { + return shardColl.getDB().serverStatus().metrics.cursor.timedOut >= + (numRouterCursorsTimedOut + 1); +}, "sharded cursor failed to time out"); + +// Wait until mongod reflects the newly timed out cursors. +assert.soon(function() { + return routerColl.getDB().serverStatus().metrics.cursor.timedOut >= + (numShardCursorsTimedOut + 1); +}, "router cursor failed to time out"); + +assert.throws(function() { + routerCursorWithTimeout.itcount(); +}); +assert.throws(function() { + shardCursorWithTimeout.itcount(); +}); + +assert.commandWorked( + mongosDB.adminCommand({setParameter: 1, enableTimeoutOfInactiveSessionCursors: false})); +assert.commandWorked( + shardDB.adminCommand({setParameter: 1, enableTimeoutOfInactiveSessionCursors: false})); +})(); + st.stop(); })(); diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp index bd2800964fb..30022109077 100644 --- a/src/mongo/db/cursor_manager.cpp +++ b/src/mongo/db/cursor_manager.cpp @@ -51,6 +51,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" #include "mongo/platform/random.h" @@ -120,7 +121,8 @@ CursorManager::~CursorManager() { } bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_t now) { - if (cursor->isNoTimeout() || cursor->_operationUsingCursor || cursor->getSessionId()) { + if (cursor->isNoTimeout() || cursor->_operationUsingCursor || + (cursor->getSessionId() && !enableTimeoutOfInactiveSessionCursors.load())) { return false; } return (now - cursor->_lastUseDate) >= Milliseconds(getCursorTimeoutMillis()); diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl index ddccc7ff8a3..d68433340a0 100644 --- a/src/mongo/db/query/query_knobs.idl +++ b/src/mongo/db/query/query_knobs.idl @@ -422,3 +422,10 @@ server_parameters: cpp_varname: "enableSearchMeta" cpp_vartype: AtomicWord<bool> default: false + + enableTimeoutOfInactiveSessionCursors: + description: "If true, cursors opened within sessions are eligible for inactive cursor timeout." + set_at: [ startup, runtime ] + cpp_varname: "enableTimeoutOfInactiveSessionCursors" + cpp_vartype: AtomicWord<bool> + default: false diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 64fcb8e4d87..4c621f15e23 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -38,6 +38,7 @@ #include "mongo/db/kill_sessions_common.h" #include "mongo/db/logical_session_cache.h" +#include "mongo/db/query/query_knobs_gen.h" #include "mongo/logv2/log.h" #include "mongo/util/clock_source.h" #include "mongo/util/str.h" @@ -411,8 +412,13 @@ std::size_t ClusterCursorManager::killMortalCursorsInactiveSince(OperationContex stdx::unique_lock<Latch> lk(_mutex); auto pred = [cutoff](CursorId cursorId, const CursorEntry& entry) -> bool { - bool res = entry.getLifetimeType() == CursorLifetime::Mortal && !entry.getLsid() && - !entry.getOperationUsingCursor() && entry.getLastActive() <= cutoff; + if (entry.getLifetimeType() == CursorLifetime::Immortal || + entry.getOperationUsingCursor() || + (entry.getLsid() && !enableTimeoutOfInactiveSessionCursors.load())) { + return false; + } + + bool res = entry.getLastActive() <= cutoff; if (res) { LOGV2(22837, |