summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2022-08-15 19:58:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-16 14:50:42 +0000
commit954ae7c6e4d33766170f767a0ad4807b8a7b215e (patch)
tree48659bd7f29563ba3100024dfd9d1195ee5015cb
parent0f0d3670dc5568157d6230eead322389a8e2d60b (diff)
downloadmongo-954ae7c6e4d33766170f767a0ad4807b8a7b215e.tar.gz
SERVER-68039 Support cleaning up dbQuery/dbGetMore exhaust cursors safely
(cherry picked from commit effae26eda3c0f4f71d1e7b48bb1291497dab25d)
-rw-r--r--src/mongo/db/exhaust_cursor_currentop_integration_test.cpp522
-rw-r--r--src/mongo/db/service_entry_point_common.cpp5
-rw-r--r--src/mongo/transport/service_state_machine.cpp16
3 files changed, 320 insertions, 223 deletions
diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
index fae024557dc..4b701ea3439 100644
--- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
+++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
@@ -43,169 +43,317 @@
namespace mongo {
namespace {
-// Specifies the amount of time we are willing to wait for a parallel operation to complete.
-const auto parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000);
-// Obtain a pointer to the global system clock. Used to enforce timeouts in the parallel thread.
-auto* const clock = SystemClockSource::get();
+class CurrentOpExhaustCursorTestFixture : public unittest::Test {
+public:
+ CurrentOpExhaustCursorTestFixture() = default;
+ explicit CurrentOpExhaustCursorTestFixture(NetworkOp op) : _op{op} {}
-const NamespaceString testNSS{"exhaust_cursor_currentop.testColl"};
+ std::unique_ptr<DBClientBase> connect() {
+ return connect(testAppName);
+ }
-const StringData testAppName = "curop_exhaust_cursor_test";
-std::unique_ptr<DBClientBase> connect(StringData appName = testAppName) {
- std::string errMsg;
- auto conn = unittest::getFixtureConnectionString().connect(appName.toString(), errMsg);
- uassert(ErrorCodes::SocketException, errMsg, conn);
- return conn;
-}
-const StringData testBackgroundAppName = "curop_exhaust_cursor_test_bg";
+ std::unique_ptr<DBClientBase> connect(StringData appName) {
+ std::string errMsg;
+ auto conn = unittest::getFixtureConnectionString().connect(appName.toString(), errMsg);
+ uassert(ErrorCodes::SocketException, errMsg, conn);
+ return conn;
+ }
-void initTestCollection(DBClientBase* conn) {
- // Drop and recreate the test namespace.
- conn->dropCollection(testNSS.ns());
- for (int i = 0; i < 10; i++) {
- auto insertCmd =
- BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i)));
- auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd));
- ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ void initTestCollection(DBClientBase* conn) {
+ // Drop and recreate the test namespace.
+ conn->dropCollection(testNSS.ns());
+ if (_op == dbMsg) {
+ for (int i = 0; i < 10; i++) {
+ auto insertCmd =
+ BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i)));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ }
+ } else if (_op == dbQuery) {
+ // For dbQuery exhaust, the getMore batch size is fixed at the server-defined
+ // limit of 16Mb, and can't be manipulated. We need to insert enough documents
+ // to make sure we have multiple batches of getMores given that limit;
+ // as a result, we create approximately 16KB documents and insert 3000 of them
+ // to ensure that three getMores are required to exhaust the cursor.
+ constexpr int numDocs = 3000;
+ constexpr int strSize = 16 * 1024;
+ for (int i = 0; i < numDocs; i++) {
+ std::string bigString("A", strSize);
+ auto insertCmd =
+ BSON("insert" << testNSS.coll() << "documents"
+ << BSON_ARRAY(BSON("a" << i << "data" << bigString)));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ }
+ }
}
-}
-void setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(DBClientBase* conn, bool enable) {
- auto cmdObj = BSON("configureFailPoint"
- << "waitWithPinnedCursorDuringGetMoreBatch"
- << "mode" << (enable ? "alwaysOn" : "off") << "data"
- << BSON("shouldNotdropLock" << true << "shouldContinueOnInterrupt" << true));
- auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj));
- ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
-}
+ void setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(DBClientBase* conn, bool enable) {
+ auto cmdObj =
+ BSON("configureFailPoint"
+ << "waitWithPinnedCursorDuringGetMoreBatch"
+ << "mode" << (enable ? "alwaysOn" : "off") << "data"
+ << BSON("shouldNotdropLock" << true << "shouldContinueOnInterrupt" << true));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ }
-void setWaitAfterCommandFinishesExecutionFailpoint(DBClientBase* conn, bool enable) {
- auto cmdObj = BSON("configureFailPoint"
- << "waitAfterCommandFinishesExecution"
- << "mode" << (enable ? "alwaysOn" : "off") << "data"
- << BSON("ns" << testNSS.toString()));
- auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj));
- ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
-}
+ std::string setWaitAfterGetMoreFinishesExecutionFailpoint(DBClientBase* conn, bool enable) {
+ // Since one uses the command path and the other uses the OP_GET_MORE path, we
+ // need to select the appropriate post-get-more execution fail point depending on the
+ // op we are using.
+ std::string fpName = (_op == dbMsg) ? "waitAfterCommandFinishesExecution"
+ : "waitAfterOpGetMoreFinishesExecution";
+ auto cmdObj = BSON("configureFailPoint" << fpName << "mode" << (enable ? "alwaysOn" : "off")
+ << "data" << BSON("ns" << testNSS.toString()));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ // return the appropriate failPointMsg for the failpoint we chose.
+ return fpName;
+ }
-void setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(DBClientBase* conn,
- bool enable) {
- auto cmdObj = BSON("configureFailPoint"
- << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"
- << "mode" << (enable ? "alwaysOn" : "off"));
- auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj));
- ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
-}
+ void setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(DBClientBase* conn,
+ bool enable) {
+ auto cmdObj = BSON("configureFailPoint"
+ << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"
+ << "mode" << (enable ? "alwaysOn" : "off"));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ }
-bool confirmCurrentOpContents(DBClientBase* conn,
- BSONObj curOpMatch,
- bool expectEmptyResult = false,
- Milliseconds timeoutMS = Milliseconds(5 * 60 * 1000),
- Milliseconds intervalMS = Milliseconds(200)) {
- auto curOpCmd = BSON("aggregate" << 1 << "cursor" << BSONObj() << "pipeline"
- << BSON_ARRAY(BSON("$currentOp" << BSON("idleCursors" << true))
- << BSON("$match" << curOpMatch)));
- const auto startTime = clock->now();
- while (clock->now() - startTime < timeoutMS) {
- auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", curOpCmd));
- auto swCursorRes = CursorResponse::parseFromBSON(reply->getCommandReply());
- ASSERT_OK(swCursorRes.getStatus());
- if (swCursorRes.getValue().getBatch().empty() == expectEmptyResult) {
- return true;
+ bool confirmCurrentOpContents(DBClientBase* conn,
+ BSONObj curOpMatch,
+ bool expectEmptyResult = false,
+ Milliseconds timeoutMS = Milliseconds(5 * 60 * 1000),
+ Milliseconds intervalMS = Milliseconds(200)) {
+ auto curOpCmd =
+ BSON("aggregate" << 1 << "cursor" << BSONObj() << "pipeline"
+ << BSON_ARRAY(BSON("$currentOp" << BSON("idleCursors" << true))
+ << BSON("$match" << curOpMatch)));
+ const auto startTime = clock->now();
+ while (clock->now() - startTime < timeoutMS) {
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", curOpCmd));
+ auto swCursorRes = CursorResponse::parseFromBSON(reply->getCommandReply());
+ ASSERT_OK(swCursorRes.getStatus());
+ if (swCursorRes.getValue().getBatch().empty() == expectEmptyResult) {
+ return true;
+ }
+ sleepFor(intervalMS);
}
- sleepFor(intervalMS);
+ auto currentOp = BSON("currentOp" << BSON("idleCursors" << true));
+ LOGV2(20606,
+ "confirmCurrentOpContents fails with curOpMatch: {curOpMatch} currentOp: "
+ "{conn_runCommand_OpMsgRequest_fromDBAndBody_admin_currentOp_getCommandReply}",
+ "curOpMatch"_attr = curOpMatch,
+ "conn_runCommand_OpMsgRequest_fromDBAndBody_admin_currentOp_getCommandReply"_attr =
+ conn->runCommand(OpMsgRequest::fromDBAndBody("admin", currentOp))
+ ->getCommandReply());
+ return false;
}
- auto currentOp = BSON("currentOp" << BSON("idleCursors" << true));
- LOGV2(20606,
- "confirmCurrentOpContents fails with curOpMatch: {curOpMatch} currentOp: "
- "{conn_runCommand_OpMsgRequest_fromDBAndBody_admin_currentOp_getCommandReply}",
- "curOpMatch"_attr = curOpMatch,
- "conn_runCommand_OpMsgRequest_fromDBAndBody_admin_currentOp_getCommandReply"_attr =
- conn->runCommand(OpMsgRequest::fromDBAndBody("admin", currentOp))->getCommandReply());
- return false;
-}
-repl::OpTime getLastAppliedOpTime(DBClientBase* conn) {
- auto reply =
- conn->runCommand(OpMsgRequest::fromDBAndBody("admin", BSON("replSetGetStatus" << 1)));
- ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
- auto lastAppliedOpTime = reply->getCommandReply()["optimes"]["appliedOpTime"];
- return repl::OpTime(lastAppliedOpTime["ts"].timestamp(), lastAppliedOpTime["t"].numberLong());
-}
+ repl::OpTime getLastAppliedOpTime(DBClientBase* conn) {
+ auto reply =
+ conn->runCommand(OpMsgRequest::fromDBAndBody("admin", BSON("replSetGetStatus" << 1)));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ auto lastAppliedOpTime = reply->getCommandReply()["optimes"]["appliedOpTime"];
+ return repl::OpTime(lastAppliedOpTime["ts"].timestamp(),
+ lastAppliedOpTime["t"].numberLong());
+ }
-// Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in
-// subsequent 'getMore's.
-auto startExhaustQuery(
- DBClientBase* queryConnection,
- std::unique_ptr<DBClientCursor>& queryCursor,
- int queryOptions = 0,
- Milliseconds awaitDataTimeoutMS = Milliseconds(5000),
- const boost::optional<repl::OpTime>& lastKnownCommittedOpTime = boost::none) {
- queryOptions = queryOptions | QueryOption_Exhaust;
- auto queryThread = stdx::async(stdx::launch::async, [&] {
- const auto projSpec = BSON("_id" << 0 << "a" << 1);
- // Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then iterate
- // through the first batch and confirm that the results are as expected.
- queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, queryOptions, 2);
- for (int i = 0; i < 2; ++i) {
- ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << i));
+ // Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in
+ // subsequent 'getMore's.
+ auto startExhaustQuery(
+ DBClientBase* queryConnection,
+ std::unique_ptr<DBClientCursor>& queryCursor,
+ int queryOptions = 0,
+ Milliseconds awaitDataTimeoutMS = Milliseconds(5000),
+ const boost::optional<repl::OpTime>& lastKnownCommittedOpTime = boost::none) {
+ queryOptions = queryOptions | QueryOption_Exhaust;
+ if (_op == dbQuery) {
+ queryOptions |= DBClientCursor::QueryOptionLocal_forceOpQuery;
}
- // Having exhausted the two results returned by the initial find, we set the batchSize to 1
- // and issue a single getMore via DBClientCursor::more(). Because the 'exhaust' flag is set,
- // the server will generate a series of internal getMores and stream them back to the client
- // until the cursor is exhausted, without the client sending any further getMore requests.
- // We expect this request to hang at the 'waitWithPinnedCursorDuringGetMoreBatch' failpoint.
- queryCursor->setBatchSize(1);
- if ((queryOptions & QueryOption_CursorTailable) && (queryOptions & QueryOption_AwaitData)) {
- queryCursor->setAwaitDataTimeoutMS(awaitDataTimeoutMS);
- if (lastKnownCommittedOpTime) {
- auto term = lastKnownCommittedOpTime.get().getTerm();
- queryCursor->setCurrentTermAndLastCommittedOpTime(term, lastKnownCommittedOpTime);
+ auto queryThread = stdx::async(stdx::launch::async, [&] {
+ // Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then
+ // iterate through the first batch and confirm that the results are as expected.
+ queryCursor = queryConnection->query(testNSS, {}, 0, 0, nullptr, queryOptions, 2);
+ for (int i = 0; i < 2; ++i) {
+ auto doc = queryCursor->nextSafe();
+ ASSERT(!doc["a"].eoo());
+ ASSERT(doc["a"].safeNumberInt() == i);
}
+ // Having exhausted the two results returned by the initial find, we set the batchSize
+ // to 1 and issue a single getMore via DBClientCursor::more(). Because the 'exhaust'
+ // flag is set, the server will generate a series of internal getMores and stream them
+ // back to the client until the cursor is exhausted, without the client sending any
+ // further getMore requests. We expect this request to hang at the
+ // 'waitWithPinnedCursorDuringGetMoreBatch' failpoint.
+ queryCursor->setBatchSize(1);
+ if ((queryOptions & QueryOption_CursorTailable) &&
+ (queryOptions & QueryOption_AwaitData)) {
+ queryCursor->setAwaitDataTimeoutMS(awaitDataTimeoutMS);
+ if (lastKnownCommittedOpTime) {
+ auto term = lastKnownCommittedOpTime.get().getTerm();
+ queryCursor->setCurrentTermAndLastCommittedOpTime(term,
+ lastKnownCommittedOpTime);
+ }
+ }
+ ASSERT(queryCursor->more());
+ });
+
+ // Wait until the parallel operation initializes its cursor.
+ const auto startTime = clock->now();
+ while (!queryCursor && (clock->now() - startTime < parallelWaitTimeoutMS)) {
+ sleepFor(Milliseconds(10));
}
- ASSERT(queryCursor->more());
- });
+ ASSERT(queryCursor);
+ LOGV2(20607,
+ "Started exhaust query with cursorId: {queryCursor_getCursorId}",
+ "queryCursor_getCursorId"_attr = queryCursor->getCursorId());
+ return queryThread;
+ }
- // Wait until the parallel operation initializes its cursor.
- const auto startTime = clock->now();
- while (!queryCursor && (clock->now() - startTime < parallelWaitTimeoutMS)) {
- sleepFor(Milliseconds(10));
+ void runOneGetMore(DBClientBase* conn,
+ const std::unique_ptr<DBClientCursor>& queryCursor,
+ boost::optional<int> nDocsReturned) {
+ BSONObjBuilder bob;
+ bob.append("command.collection", testNSS.coll());
+ bob.append("command.getMore", queryCursor->getCursorId());
+ bob.append("msg", "waitWithPinnedCursorDuringGetMoreBatch");
+ if (nDocsReturned) {
+ bob.append("cursor.nDocsReturned", *nDocsReturned);
+ }
+ const auto curOpMatch = bob.done();
+ // Confirm that the initial getMore appears in the $currentOp output.
+ ASSERT(confirmCurrentOpContents(conn, curOpMatch));
+
+ // Airlock the failpoint by releasing it only after we enable a post-getMore failpoint. This
+ // ensures that no subsequent getMores can run before we re-enable the original failpoint.
+ setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, true);
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, false);
+ // Confirm that the getMore completed its batch and hit the post-getMore failpoint.
+ ASSERT(confirmCurrentOpContents(
+ conn,
+ BSON("command.getMore" << queryCursor->getCursorId() << "msg"
+ << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch")));
+ // Re-enable the original failpoint to catch the next getMore, and release the current one.
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, true);
+ setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, false);
}
- ASSERT(queryCursor);
- LOGV2(20607,
- "Started exhaust query with cursorId: {queryCursor_getCursorId}",
- "queryCursor_getCursorId"_attr = queryCursor->getCursorId());
- return queryThread;
-}
-void runOneGetMore(DBClientBase* conn,
- const std::unique_ptr<DBClientCursor>& queryCursor,
- int nDocsReturned) {
- const auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
- << queryCursor->getCursorId() << "msg"
- << "waitWithPinnedCursorDuringGetMoreBatch"
- << "cursor.nDocsReturned" << nDocsReturned);
- // Confirm that the initial getMore appears in the $currentOp output.
- ASSERT(confirmCurrentOpContents(conn, curOpMatch));
+ // Test exhaust cursor is cleaned up on client disconnect. By default, the test client
+ // disconnects while the exhaust getMore is running. If disconnectAfterGetMoreBatch is set to
+ // true, the test client disconnects after the exhaust getMore is run but before the server
+ // sends out the response.
+ void testClientDisconnect(bool disconnectAfterGetMoreBatch) {
+ auto conn = connect();
+
+ // We need to set failpoints around getMore which cause it to hang, so only test against a
+ // single server rather than a replica set or mongoS. Mongos doesn't support exhuast at all
+ // and the failpoints might disrupt the completion of other commands (like monitoring hellos
+ // and heartbeats) for replica sets.
+ if (conn->isReplicaSetMember() || conn->isMongos()) {
+ return;
+ }
- // Airlock the failpoint by releasing it only after we enable a post-getMore failpoint. This
- // ensures that no subsequent getMores can run before we re-enable the original failpoint.
- setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, true);
- setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, false);
- // Confirm that the getMore completed its batch and hit the post-getMore failpoint.
- ASSERT(confirmCurrentOpContents(
- conn,
- BSON("command.getMore" << queryCursor->getCursorId() << "msg"
- << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch")));
+ initTestCollection(conn.get());
+
+ // Enable a failpoint to block getMore during execution.
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true);
+
+ const auto connStr = unittest::getFixtureConnectionString();
+ const auto queryConnection = std::make_unique<DBClientConnection>();
+ uassertStatusOK(queryConnection->connect(connStr.getServers()[0], testBackgroundAppName));
+ std::unique_ptr<DBClientCursor> queryCursor;
+
+ // Execute a query on a separate thread, with the 'exhaust' flag set.
+ auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor);
+ // Ensure that, regardless of whether the test completes or fails, we release all
+ // failpoints.
+ ON_BLOCK_EXIT([&conn, &queryThread, this] {
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
+ setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false);
+ queryThread.wait();
+ });
+
+ // This will allow the initial getMore to run.
+ if (_op == dbQuery) {
+ // We can't know how many docs to expect when using dbQuery
+ // because the batch size is determined by the server.
+ runOneGetMore(conn.get(), queryCursor, boost::none);
+ } else {
+ runOneGetMore(conn.get(), queryCursor, 2);
+ }
- // Re-enable the original failpoint to catch the next getMore, and release the current one.
- setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, true);
- setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, false);
-}
+ // The next getMore will be an exhaust getMore. Confirm that the exhaust getMore appears in
+ // the $currentOp output.
+ BSONObjBuilder bob;
+ bob.append("command.collection", testNSS.coll());
+ bob.append("command.getMore", queryCursor->getCursorId());
+ bob.append("msg", "waitWithPinnedCursorDuringGetMoreBatch");
+ if (_op == dbMsg) {
+ // We can't know how many docs to expect when using dbQuery
+ // because the batch size is determined by the server.
+ bob.append("cursor.nDocsReturned", 3);
+ }
+ auto curOpMatch = bob.done();
+ ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch));
+
+ if (disconnectAfterGetMoreBatch) {
+ // Allow the exhaust getMore to run but block it before sending out the response.
+ std::string failPointMsg =
+ setWaitAfterGetMoreFinishesExecutionFailpoint(conn.get(), true);
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
+ ASSERT(confirmCurrentOpContents(
+ conn.get(),
+ BSON("command.getMore" << queryCursor->getCursorId() << "msg" << failPointMsg)));
+ }
+
+ // Kill the client connection while the exhaust getMore is blocked on the failpoint.
+ queryConnection->shutdownAndDisallowReconnect();
+ LOGV2(20608, "Killed exhaust connection.");
+
+ if (disconnectAfterGetMoreBatch) {
+ // Disable the failpoint to allow the exhaust getMore to continue sending out the
+ // response after the client disconnects. This will result in a broken pipe error.
+ setWaitAfterGetMoreFinishesExecutionFailpoint(conn.get(), false);
+ }
+
+ curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
+ << queryCursor->getCursorId());
+ // Confirm that the exhaust getMore was interrupted and does not appear in the $currentOp
+ // output.
+ const bool expectEmptyResult = true;
+ ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult));
+
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
+ setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false);
+
+ curOpMatch = BSON("type"
+ << "idleCursor"
+ << "cursor.cursorId" << queryCursor->getCursorId());
+ // Confirm that the cursor was cleaned up and does not appear in the $currentOp idleCursor
+ // output.
+ ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult));
+ }
+
+ // Specifies the amount of time we are willing to wait for a parallel operation to complete.
+ Milliseconds parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000);
+
+ // Obtain a pointer to the global system clock. Used to enforce timeouts in the parallel thread.
+ SystemClockSource* clock = SystemClockSource::get();
+
+ const NamespaceString testNSS{"exhaust_cursor_currentop.testColl"};
+
+ const StringData testAppName = "curop_exhaust_cursor_test";
+
+ const StringData testBackgroundAppName = "curop_exhaust_cursor_test_bg";
+
+ NetworkOp _op{dbMsg};
+};
} // namespace
-TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOpOutput) {
+TEST_F(CurrentOpExhaustCursorTestFixture, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOpOutput) {
auto conn = connect();
// We need to set failpoints around getMore which cause it to hang, so only test against a
@@ -225,7 +373,7 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp
// Execute a query on a separate thread, with the 'exhaust' flag set.
auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor);
// Ensure that, regardless of whether the test completes or fails, we release all failpoints.
- ON_BLOCK_EXIT([&conn, &queryThread] {
+ ON_BLOCK_EXIT([&conn, &queryThread, this] {
setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false);
queryThread.wait();
@@ -240,97 +388,31 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp
}
}
-// Test exhaust cursor is cleaned up on client disconnect. By default, the test client disconnects
-// while the exhaust getMore is running. If disconnectAfterGetMoreBatch is set to true, the test
-// client disconnects after the exhaust getMore is run but before the server sends out the response.
-void testClientDisconnect(bool disconnectAfterGetMoreBatch) {
- auto conn = connect();
-
- // We need to set failpoints around getMore which cause it to hang, so only test against a
- // single server rather than a replica set or mongoS.
- if (conn->isReplicaSetMember() || conn->isMongos()) {
- return;
- }
-
- initTestCollection(conn.get());
-
- // Enable a failpoint to block getMore during execution.
- setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true);
-
- const auto connStr = unittest::getFixtureConnectionString();
- const auto queryConnection = std::make_unique<DBClientConnection>();
- uassertStatusOK(queryConnection->connect(connStr.getServers()[0], testBackgroundAppName));
- std::unique_ptr<DBClientCursor> queryCursor;
-
- // Execute a query on a separate thread, with the 'exhaust' flag set.
- auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor);
- // Ensure that, regardless of whether the test completes or fails, we release all failpoints.
- ON_BLOCK_EXIT([&conn, &queryThread] {
- setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
- setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false);
- queryThread.wait();
- });
-
- // This will allow the initial getMore to run.
- runOneGetMore(conn.get(), queryCursor, 2);
- // The next getMore will be an exhaust getMore. Confirm that the exhaust getMore appears in the
- // $currentOp output.
- auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
- << queryCursor->getCursorId() << "msg"
- << "waitWithPinnedCursorDuringGetMoreBatch"
- << "cursor.nDocsReturned" << 3);
- ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch));
-
- if (disconnectAfterGetMoreBatch) {
- // Allow the exhaust getMore to run but block it before sending out the response.
- setWaitAfterCommandFinishesExecutionFailpoint(conn.get(), true);
- setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
- ASSERT(confirmCurrentOpContents(conn.get(),
- BSON("command.getMore"
- << queryCursor->getCursorId() << "msg"
- << "waitAfterCommandFinishesExecution")));
- }
-
- // Kill the client connection while the exhaust getMore is blocked on the failpoint.
- queryConnection->shutdownAndDisallowReconnect();
- LOGV2(20608, "Killed exhaust connection.");
-
- if (disconnectAfterGetMoreBatch) {
- // Disable the failpoint to allow the exhaust getMore to continue sending out the response
- // after the client disconnects. This will result in a broken pipe error.
- setWaitAfterCommandFinishesExecutionFailpoint(conn.get(), false);
- }
+TEST_F(CurrentOpExhaustCursorTestFixture, InterruptExhaustCursorPseudoGetMoreOnClientDisconnect) {
+ // Test that an exhaust getMore is interrupted on client disconnect.
+ testClientDisconnect(false /* disconnectAfterGetMoreBatch */);
+}
- curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
- << queryCursor->getCursorId());
- // Confirm that the exhaust getMore was interrupted and does not appear in the $currentOp
- // output.
- const bool expectEmptyResult = true;
- ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult));
-
- setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
- setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false);
-
- curOpMatch = BSON("type"
- << "idleCursor"
- << "cursor.cursorId" << queryCursor->getCursorId());
- // Confirm that the cursor was cleaned up and does not appear in the $currentOp idleCursor
- // output.
- ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult));
+TEST_F(CurrentOpExhaustCursorTestFixture, CleanupExhaustCursorOnBrokenPipe) {
+ // Test that exhaust cursor is cleaned up on broken pipe even if the exhaust getMore succeeded.
+ testClientDisconnect(true /* disconnectAfterGetMoreBatch */);
}
-TEST(CurrentOpExhaustCursorTest, InterruptExhaustCursorPseudoGetMoreOnClientDisconnect) {
+TEST_F(CurrentOpExhaustCursorTestFixture,
+ InterruptExhaustCursorPseudoGetMoreOnClientDisconnectDbQuery) {
+ _op = dbQuery;
// Test that an exhaust getMore is interrupted on client disconnect.
testClientDisconnect(false /* disconnectAfterGetMoreBatch */);
}
-TEST(CurrentOpExhaustCursorTest, CleanupExhaustCursorOnBrokenPipe) {
+TEST_F(CurrentOpExhaustCursorTestFixture, CleanupExhaustCursorOnBrokenPipeDbQuery) {
+ _op = dbQuery;
// Test that exhaust cursor is cleaned up on broken pipe even if the exhaust getMore succeeded.
testClientDisconnect(true /* disconnectAfterGetMoreBatch */);
}
-TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) {
+TEST_F(CurrentOpExhaustCursorTestFixture, ExhaustCursorUpdatesLastKnownCommittedOpTime) {
auto fixtureConn = connect();
// We need to test the lastKnownCommittedOpTime in exhaust getMore requests. So we need a
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index c9355a84d58..16ff3dae0d1 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -113,6 +113,7 @@ MONGO_FAIL_POINT_DEFINE(skipCheckingForNotPrimaryInCommandDispatch);
MONGO_FAIL_POINT_DEFINE(sleepMillisAfterCommandExecutionBegins);
MONGO_FAIL_POINT_DEFINE(waitAfterNewStatementBlocksBehindPrepare);
MONGO_FAIL_POINT_DEFINE(waitAfterCommandFinishesExecution);
+MONGO_FAIL_POINT_DEFINE(waitAfterOpGetMoreFinishesExecution);
MONGO_FAIL_POINT_DEFINE(failWithErrorCodeInRunCommand);
MONGO_FAIL_POINT_DEFINE(hangBeforeSessionCheckOut);
MONGO_FAIL_POINT_DEFINE(hangBeforeSettingTxnInterruptFlag);
@@ -1655,6 +1656,8 @@ DbResponse receivedGetMore(OperationContext* opCtx,
curop.debug().responseLength = dbresponse.response.header().dataLen();
curop.debug().nreturned = 1;
*shouldLogOpDebug = true;
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &waitAfterOpGetMoreFinishesExecution, opCtx, "waitAfterOpGetMoreFinishesExecution");
return dbresponse;
}
@@ -1667,6 +1670,8 @@ DbResponse receivedGetMore(OperationContext* opCtx,
dbresponse.shouldRunAgainForExhaust = true;
}
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &waitAfterOpGetMoreFinishesExecution, opCtx, "waitAfterOpGetMoreFinishesExecution");
return dbresponse;
}
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 42a633829af..057799dd435 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -684,10 +684,20 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try {
if (!_inExhaust) {
return;
}
- auto request = OpMsgRequest::parse(_inMessage);
+ long long cursorId = 0;
+ if (_inMessage.operation() == dbGetMore) {
+ DbMessage dbm(_inMessage);
+ [[maybe_unused]] const int ntoreturn = dbm.pullInt();
+ cursorId = dbm.pullInt64();
+ } else {
+ invariant(_inMessage.operation() == dbMsg);
+ auto request = OpMsgRequest::parse(_inMessage);
+ if (request.getCommandName() == "getMore"_sd) {
+ cursorId = request.body["getMore"].Long();
+ }
+ }
// Clean up cursor for exhaust getMore request.
- if (request.getCommandName() == "getMore"_sd) {
- auto cursorId = request.body["getMore"].Long();
+ if (cursorId != 0) {
auto opCtx = Client::getCurrent()->makeOperationContext();
// Fire and forget. This is a best effort attempt to immediately clean up the exhaust
// cursor. If the killCursors request fails here for any reasons, it will still be