diff options
author | George Wangensteen <george.wangensteen@mongodb.com> | 2022-08-15 19:58:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-16 14:50:42 +0000 |
commit | 954ae7c6e4d33766170f767a0ad4807b8a7b215e (patch) | |
tree | 48659bd7f29563ba3100024dfd9d1195ee5015cb | |
parent | 0f0d3670dc5568157d6230eead322389a8e2d60b (diff) | |
download | mongo-954ae7c6e4d33766170f767a0ad4807b8a7b215e.tar.gz |
SERVER-68039 Support cleaning up dbQuery/dbGetMore exhaust cursors safely
(cherry picked from commit effae26eda3c0f4f71d1e7b48bb1291497dab25d)
-rw-r--r-- | src/mongo/db/exhaust_cursor_currentop_integration_test.cpp | 522 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 5 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 16 |
3 files changed, 320 insertions, 223 deletions
diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp index fae024557dc..4b701ea3439 100644 --- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp +++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp @@ -43,169 +43,317 @@ namespace mongo { namespace { -// Specifies the amount of time we are willing to wait for a parallel operation to complete. -const auto parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000); -// Obtain a pointer to the global system clock. Used to enforce timeouts in the parallel thread. -auto* const clock = SystemClockSource::get(); +class CurrentOpExhaustCursorTestFixture : public unittest::Test { +public: + CurrentOpExhaustCursorTestFixture() = default; + explicit CurrentOpExhaustCursorTestFixture(NetworkOp op) : _op{op} {} -const NamespaceString testNSS{"exhaust_cursor_currentop.testColl"}; + std::unique_ptr<DBClientBase> connect() { + return connect(testAppName); + } -const StringData testAppName = "curop_exhaust_cursor_test"; -std::unique_ptr<DBClientBase> connect(StringData appName = testAppName) { - std::string errMsg; - auto conn = unittest::getFixtureConnectionString().connect(appName.toString(), errMsg); - uassert(ErrorCodes::SocketException, errMsg, conn); - return conn; -} -const StringData testBackgroundAppName = "curop_exhaust_cursor_test_bg"; + std::unique_ptr<DBClientBase> connect(StringData appName) { + std::string errMsg; + auto conn = unittest::getFixtureConnectionString().connect(appName.toString(), errMsg); + uassert(ErrorCodes::SocketException, errMsg, conn); + return conn; + } -void initTestCollection(DBClientBase* conn) { - // Drop and recreate the test namespace. - conn->dropCollection(testNSS.ns()); - for (int i = 0; i < 10; i++) { - auto insertCmd = - BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i))); - auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd)); - ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + void initTestCollection(DBClientBase* conn) { + // Drop and recreate the test namespace. + conn->dropCollection(testNSS.ns()); + if (_op == dbMsg) { + for (int i = 0; i < 10; i++) { + auto insertCmd = + BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i))); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + } + } else if (_op == dbQuery) { + // For dbQuery exhaust, the getMore batch size is fixed at the server-defined + // limit of 16Mb, and can't be manipulated. We need to insert enough documents + // to make sure we have multiple batches of getMores given that limit; + // as a result, we create approximately 16KB documents and insert 3000 of them + // to ensure that three getMores are required to exhaust the cursor. + constexpr int numDocs = 3000; + constexpr int strSize = 16 * 1024; + for (int i = 0; i < numDocs; i++) { + std::string bigString("A", strSize); + auto insertCmd = + BSON("insert" << testNSS.coll() << "documents" + << BSON_ARRAY(BSON("a" << i << "data" << bigString))); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + } + } } -} -void setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(DBClientBase* conn, bool enable) { - auto cmdObj = BSON("configureFailPoint" - << "waitWithPinnedCursorDuringGetMoreBatch" - << "mode" << (enable ? "alwaysOn" : "off") << "data" - << BSON("shouldNotdropLock" << true << "shouldContinueOnInterrupt" << true)); - auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj)); - ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); -} + void setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(DBClientBase* conn, bool enable) { + auto cmdObj = + BSON("configureFailPoint" + << "waitWithPinnedCursorDuringGetMoreBatch" + << "mode" << (enable ? "alwaysOn" : "off") << "data" + << BSON("shouldNotdropLock" << true << "shouldContinueOnInterrupt" << true)); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + } -void setWaitAfterCommandFinishesExecutionFailpoint(DBClientBase* conn, bool enable) { - auto cmdObj = BSON("configureFailPoint" - << "waitAfterCommandFinishesExecution" - << "mode" << (enable ? "alwaysOn" : "off") << "data" - << BSON("ns" << testNSS.toString())); - auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj)); - ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); -} + std::string setWaitAfterGetMoreFinishesExecutionFailpoint(DBClientBase* conn, bool enable) { + // Since one uses the command path and the other uses the OP_GET_MORE path, we + // need to select the appropriate post-get-more execution fail point depending on the + // op we are using. + std::string fpName = (_op == dbMsg) ? "waitAfterCommandFinishesExecution" + : "waitAfterOpGetMoreFinishesExecution"; + auto cmdObj = BSON("configureFailPoint" << fpName << "mode" << (enable ? "alwaysOn" : "off") + << "data" << BSON("ns" << testNSS.toString())); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + // return the appropriate failPointMsg for the failpoint we chose. + return fpName; + } -void setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(DBClientBase* conn, - bool enable) { - auto cmdObj = BSON("configureFailPoint" - << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch" - << "mode" << (enable ? "alwaysOn" : "off")); - auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj)); - ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); -} + void setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(DBClientBase* conn, + bool enable) { + auto cmdObj = BSON("configureFailPoint" + << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch" + << "mode" << (enable ? "alwaysOn" : "off")); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + } -bool confirmCurrentOpContents(DBClientBase* conn, - BSONObj curOpMatch, - bool expectEmptyResult = false, - Milliseconds timeoutMS = Milliseconds(5 * 60 * 1000), - Milliseconds intervalMS = Milliseconds(200)) { - auto curOpCmd = BSON("aggregate" << 1 << "cursor" << BSONObj() << "pipeline" - << BSON_ARRAY(BSON("$currentOp" << BSON("idleCursors" << true)) - << BSON("$match" << curOpMatch))); - const auto startTime = clock->now(); - while (clock->now() - startTime < timeoutMS) { - auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", curOpCmd)); - auto swCursorRes = CursorResponse::parseFromBSON(reply->getCommandReply()); - ASSERT_OK(swCursorRes.getStatus()); - if (swCursorRes.getValue().getBatch().empty() == expectEmptyResult) { - return true; + bool confirmCurrentOpContents(DBClientBase* conn, + BSONObj curOpMatch, + bool expectEmptyResult = false, + Milliseconds timeoutMS = Milliseconds(5 * 60 * 1000), + Milliseconds intervalMS = Milliseconds(200)) { + auto curOpCmd = + BSON("aggregate" << 1 << "cursor" << BSONObj() << "pipeline" + << BSON_ARRAY(BSON("$currentOp" << BSON("idleCursors" << true)) + << BSON("$match" << curOpMatch))); + const auto startTime = clock->now(); + while (clock->now() - startTime < timeoutMS) { + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", curOpCmd)); + auto swCursorRes = CursorResponse::parseFromBSON(reply->getCommandReply()); + ASSERT_OK(swCursorRes.getStatus()); + if (swCursorRes.getValue().getBatch().empty() == expectEmptyResult) { + return true; + } + sleepFor(intervalMS); } - sleepFor(intervalMS); + auto currentOp = BSON("currentOp" << BSON("idleCursors" << true)); + LOGV2(20606, + "confirmCurrentOpContents fails with curOpMatch: {curOpMatch} currentOp: " + "{conn_runCommand_OpMsgRequest_fromDBAndBody_admin_currentOp_getCommandReply}", + "curOpMatch"_attr = curOpMatch, + "conn_runCommand_OpMsgRequest_fromDBAndBody_admin_currentOp_getCommandReply"_attr = + conn->runCommand(OpMsgRequest::fromDBAndBody("admin", currentOp)) + ->getCommandReply()); + return false; } - auto currentOp = BSON("currentOp" << BSON("idleCursors" << true)); - LOGV2(20606, - "confirmCurrentOpContents fails with curOpMatch: {curOpMatch} currentOp: " - "{conn_runCommand_OpMsgRequest_fromDBAndBody_admin_currentOp_getCommandReply}", - "curOpMatch"_attr = curOpMatch, - "conn_runCommand_OpMsgRequest_fromDBAndBody_admin_currentOp_getCommandReply"_attr = - conn->runCommand(OpMsgRequest::fromDBAndBody("admin", currentOp))->getCommandReply()); - return false; -} -repl::OpTime getLastAppliedOpTime(DBClientBase* conn) { - auto reply = - conn->runCommand(OpMsgRequest::fromDBAndBody("admin", BSON("replSetGetStatus" << 1))); - ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); - auto lastAppliedOpTime = reply->getCommandReply()["optimes"]["appliedOpTime"]; - return repl::OpTime(lastAppliedOpTime["ts"].timestamp(), lastAppliedOpTime["t"].numberLong()); -} + repl::OpTime getLastAppliedOpTime(DBClientBase* conn) { + auto reply = + conn->runCommand(OpMsgRequest::fromDBAndBody("admin", BSON("replSetGetStatus" << 1))); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + auto lastAppliedOpTime = reply->getCommandReply()["optimes"]["appliedOpTime"]; + return repl::OpTime(lastAppliedOpTime["ts"].timestamp(), + lastAppliedOpTime["t"].numberLong()); + } -// Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in -// subsequent 'getMore's. -auto startExhaustQuery( - DBClientBase* queryConnection, - std::unique_ptr<DBClientCursor>& queryCursor, - int queryOptions = 0, - Milliseconds awaitDataTimeoutMS = Milliseconds(5000), - const boost::optional<repl::OpTime>& lastKnownCommittedOpTime = boost::none) { - queryOptions = queryOptions | QueryOption_Exhaust; - auto queryThread = stdx::async(stdx::launch::async, [&] { - const auto projSpec = BSON("_id" << 0 << "a" << 1); - // Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then iterate - // through the first batch and confirm that the results are as expected. - queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, queryOptions, 2); - for (int i = 0; i < 2; ++i) { - ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << i)); + // Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in + // subsequent 'getMore's. + auto startExhaustQuery( + DBClientBase* queryConnection, + std::unique_ptr<DBClientCursor>& queryCursor, + int queryOptions = 0, + Milliseconds awaitDataTimeoutMS = Milliseconds(5000), + const boost::optional<repl::OpTime>& lastKnownCommittedOpTime = boost::none) { + queryOptions = queryOptions | QueryOption_Exhaust; + if (_op == dbQuery) { + queryOptions |= DBClientCursor::QueryOptionLocal_forceOpQuery; } - // Having exhausted the two results returned by the initial find, we set the batchSize to 1 - // and issue a single getMore via DBClientCursor::more(). Because the 'exhaust' flag is set, - // the server will generate a series of internal getMores and stream them back to the client - // until the cursor is exhausted, without the client sending any further getMore requests. - // We expect this request to hang at the 'waitWithPinnedCursorDuringGetMoreBatch' failpoint. - queryCursor->setBatchSize(1); - if ((queryOptions & QueryOption_CursorTailable) && (queryOptions & QueryOption_AwaitData)) { - queryCursor->setAwaitDataTimeoutMS(awaitDataTimeoutMS); - if (lastKnownCommittedOpTime) { - auto term = lastKnownCommittedOpTime.get().getTerm(); - queryCursor->setCurrentTermAndLastCommittedOpTime(term, lastKnownCommittedOpTime); + auto queryThread = stdx::async(stdx::launch::async, [&] { + // Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then + // iterate through the first batch and confirm that the results are as expected. + queryCursor = queryConnection->query(testNSS, {}, 0, 0, nullptr, queryOptions, 2); + for (int i = 0; i < 2; ++i) { + auto doc = queryCursor->nextSafe(); + ASSERT(!doc["a"].eoo()); + ASSERT(doc["a"].safeNumberInt() == i); } + // Having exhausted the two results returned by the initial find, we set the batchSize + // to 1 and issue a single getMore via DBClientCursor::more(). Because the 'exhaust' + // flag is set, the server will generate a series of internal getMores and stream them + // back to the client until the cursor is exhausted, without the client sending any + // further getMore requests. We expect this request to hang at the + // 'waitWithPinnedCursorDuringGetMoreBatch' failpoint. + queryCursor->setBatchSize(1); + if ((queryOptions & QueryOption_CursorTailable) && + (queryOptions & QueryOption_AwaitData)) { + queryCursor->setAwaitDataTimeoutMS(awaitDataTimeoutMS); + if (lastKnownCommittedOpTime) { + auto term = lastKnownCommittedOpTime.get().getTerm(); + queryCursor->setCurrentTermAndLastCommittedOpTime(term, + lastKnownCommittedOpTime); + } + } + ASSERT(queryCursor->more()); + }); + + // Wait until the parallel operation initializes its cursor. + const auto startTime = clock->now(); + while (!queryCursor && (clock->now() - startTime < parallelWaitTimeoutMS)) { + sleepFor(Milliseconds(10)); } - ASSERT(queryCursor->more()); - }); + ASSERT(queryCursor); + LOGV2(20607, + "Started exhaust query with cursorId: {queryCursor_getCursorId}", + "queryCursor_getCursorId"_attr = queryCursor->getCursorId()); + return queryThread; + } - // Wait until the parallel operation initializes its cursor. - const auto startTime = clock->now(); - while (!queryCursor && (clock->now() - startTime < parallelWaitTimeoutMS)) { - sleepFor(Milliseconds(10)); + void runOneGetMore(DBClientBase* conn, + const std::unique_ptr<DBClientCursor>& queryCursor, + boost::optional<int> nDocsReturned) { + BSONObjBuilder bob; + bob.append("command.collection", testNSS.coll()); + bob.append("command.getMore", queryCursor->getCursorId()); + bob.append("msg", "waitWithPinnedCursorDuringGetMoreBatch"); + if (nDocsReturned) { + bob.append("cursor.nDocsReturned", *nDocsReturned); + } + const auto curOpMatch = bob.done(); + // Confirm that the initial getMore appears in the $currentOp output. + ASSERT(confirmCurrentOpContents(conn, curOpMatch)); + + // Airlock the failpoint by releasing it only after we enable a post-getMore failpoint. This + // ensures that no subsequent getMores can run before we re-enable the original failpoint. + setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, true); + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, false); + // Confirm that the getMore completed its batch and hit the post-getMore failpoint. + ASSERT(confirmCurrentOpContents( + conn, + BSON("command.getMore" << queryCursor->getCursorId() << "msg" + << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"))); + // Re-enable the original failpoint to catch the next getMore, and release the current one. + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, true); + setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, false); } - ASSERT(queryCursor); - LOGV2(20607, - "Started exhaust query with cursorId: {queryCursor_getCursorId}", - "queryCursor_getCursorId"_attr = queryCursor->getCursorId()); - return queryThread; -} -void runOneGetMore(DBClientBase* conn, - const std::unique_ptr<DBClientCursor>& queryCursor, - int nDocsReturned) { - const auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" - << queryCursor->getCursorId() << "msg" - << "waitWithPinnedCursorDuringGetMoreBatch" - << "cursor.nDocsReturned" << nDocsReturned); - // Confirm that the initial getMore appears in the $currentOp output. - ASSERT(confirmCurrentOpContents(conn, curOpMatch)); + // Test exhaust cursor is cleaned up on client disconnect. By default, the test client + // disconnects while the exhaust getMore is running. If disconnectAfterGetMoreBatch is set to + // true, the test client disconnects after the exhaust getMore is run but before the server + // sends out the response. + void testClientDisconnect(bool disconnectAfterGetMoreBatch) { + auto conn = connect(); + + // We need to set failpoints around getMore which cause it to hang, so only test against a + // single server rather than a replica set or mongoS. Mongos doesn't support exhuast at all + // and the failpoints might disrupt the completion of other commands (like monitoring hellos + // and heartbeats) for replica sets. + if (conn->isReplicaSetMember() || conn->isMongos()) { + return; + } - // Airlock the failpoint by releasing it only after we enable a post-getMore failpoint. This - // ensures that no subsequent getMores can run before we re-enable the original failpoint. - setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, true); - setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, false); - // Confirm that the getMore completed its batch and hit the post-getMore failpoint. - ASSERT(confirmCurrentOpContents( - conn, - BSON("command.getMore" << queryCursor->getCursorId() << "msg" - << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"))); + initTestCollection(conn.get()); + + // Enable a failpoint to block getMore during execution. + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true); + + const auto connStr = unittest::getFixtureConnectionString(); + const auto queryConnection = std::make_unique<DBClientConnection>(); + uassertStatusOK(queryConnection->connect(connStr.getServers()[0], testBackgroundAppName)); + std::unique_ptr<DBClientCursor> queryCursor; + + // Execute a query on a separate thread, with the 'exhaust' flag set. + auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor); + // Ensure that, regardless of whether the test completes or fails, we release all + // failpoints. + ON_BLOCK_EXIT([&conn, &queryThread, this] { + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); + setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false); + queryThread.wait(); + }); + + // This will allow the initial getMore to run. + if (_op == dbQuery) { + // We can't know how many docs to expect when using dbQuery + // because the batch size is determined by the server. + runOneGetMore(conn.get(), queryCursor, boost::none); + } else { + runOneGetMore(conn.get(), queryCursor, 2); + } - // Re-enable the original failpoint to catch the next getMore, and release the current one. - setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, true); - setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, false); -} + // The next getMore will be an exhaust getMore. Confirm that the exhaust getMore appears in + // the $currentOp output. + BSONObjBuilder bob; + bob.append("command.collection", testNSS.coll()); + bob.append("command.getMore", queryCursor->getCursorId()); + bob.append("msg", "waitWithPinnedCursorDuringGetMoreBatch"); + if (_op == dbMsg) { + // We can't know how many docs to expect when using dbQuery + // because the batch size is determined by the server. + bob.append("cursor.nDocsReturned", 3); + } + auto curOpMatch = bob.done(); + ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch)); + + if (disconnectAfterGetMoreBatch) { + // Allow the exhaust getMore to run but block it before sending out the response. + std::string failPointMsg = + setWaitAfterGetMoreFinishesExecutionFailpoint(conn.get(), true); + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); + ASSERT(confirmCurrentOpContents( + conn.get(), + BSON("command.getMore" << queryCursor->getCursorId() << "msg" << failPointMsg))); + } + + // Kill the client connection while the exhaust getMore is blocked on the failpoint. + queryConnection->shutdownAndDisallowReconnect(); + LOGV2(20608, "Killed exhaust connection."); + + if (disconnectAfterGetMoreBatch) { + // Disable the failpoint to allow the exhaust getMore to continue sending out the + // response after the client disconnects. This will result in a broken pipe error. + setWaitAfterGetMoreFinishesExecutionFailpoint(conn.get(), false); + } + + curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" + << queryCursor->getCursorId()); + // Confirm that the exhaust getMore was interrupted and does not appear in the $currentOp + // output. + const bool expectEmptyResult = true; + ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult)); + + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); + setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false); + + curOpMatch = BSON("type" + << "idleCursor" + << "cursor.cursorId" << queryCursor->getCursorId()); + // Confirm that the cursor was cleaned up and does not appear in the $currentOp idleCursor + // output. + ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult)); + } + + // Specifies the amount of time we are willing to wait for a parallel operation to complete. + Milliseconds parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000); + + // Obtain a pointer to the global system clock. Used to enforce timeouts in the parallel thread. + SystemClockSource* clock = SystemClockSource::get(); + + const NamespaceString testNSS{"exhaust_cursor_currentop.testColl"}; + + const StringData testAppName = "curop_exhaust_cursor_test"; + + const StringData testBackgroundAppName = "curop_exhaust_cursor_test_bg"; + + NetworkOp _op{dbMsg}; +}; } // namespace -TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOpOutput) { +TEST_F(CurrentOpExhaustCursorTestFixture, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOpOutput) { auto conn = connect(); // We need to set failpoints around getMore which cause it to hang, so only test against a @@ -225,7 +373,7 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp // Execute a query on a separate thread, with the 'exhaust' flag set. auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor); // Ensure that, regardless of whether the test completes or fails, we release all failpoints. - ON_BLOCK_EXIT([&conn, &queryThread] { + ON_BLOCK_EXIT([&conn, &queryThread, this] { setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false); queryThread.wait(); @@ -240,97 +388,31 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp } } -// Test exhaust cursor is cleaned up on client disconnect. By default, the test client disconnects -// while the exhaust getMore is running. If disconnectAfterGetMoreBatch is set to true, the test -// client disconnects after the exhaust getMore is run but before the server sends out the response. -void testClientDisconnect(bool disconnectAfterGetMoreBatch) { - auto conn = connect(); - - // We need to set failpoints around getMore which cause it to hang, so only test against a - // single server rather than a replica set or mongoS. - if (conn->isReplicaSetMember() || conn->isMongos()) { - return; - } - - initTestCollection(conn.get()); - - // Enable a failpoint to block getMore during execution. - setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true); - - const auto connStr = unittest::getFixtureConnectionString(); - const auto queryConnection = std::make_unique<DBClientConnection>(); - uassertStatusOK(queryConnection->connect(connStr.getServers()[0], testBackgroundAppName)); - std::unique_ptr<DBClientCursor> queryCursor; - - // Execute a query on a separate thread, with the 'exhaust' flag set. - auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor); - // Ensure that, regardless of whether the test completes or fails, we release all failpoints. - ON_BLOCK_EXIT([&conn, &queryThread] { - setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); - setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false); - queryThread.wait(); - }); - - // This will allow the initial getMore to run. - runOneGetMore(conn.get(), queryCursor, 2); - // The next getMore will be an exhaust getMore. Confirm that the exhaust getMore appears in the - // $currentOp output. - auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" - << queryCursor->getCursorId() << "msg" - << "waitWithPinnedCursorDuringGetMoreBatch" - << "cursor.nDocsReturned" << 3); - ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch)); - - if (disconnectAfterGetMoreBatch) { - // Allow the exhaust getMore to run but block it before sending out the response. - setWaitAfterCommandFinishesExecutionFailpoint(conn.get(), true); - setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); - ASSERT(confirmCurrentOpContents(conn.get(), - BSON("command.getMore" - << queryCursor->getCursorId() << "msg" - << "waitAfterCommandFinishesExecution"))); - } - - // Kill the client connection while the exhaust getMore is blocked on the failpoint. - queryConnection->shutdownAndDisallowReconnect(); - LOGV2(20608, "Killed exhaust connection."); - - if (disconnectAfterGetMoreBatch) { - // Disable the failpoint to allow the exhaust getMore to continue sending out the response - // after the client disconnects. This will result in a broken pipe error. - setWaitAfterCommandFinishesExecutionFailpoint(conn.get(), false); - } +TEST_F(CurrentOpExhaustCursorTestFixture, InterruptExhaustCursorPseudoGetMoreOnClientDisconnect) { + // Test that an exhaust getMore is interrupted on client disconnect. + testClientDisconnect(false /* disconnectAfterGetMoreBatch */); +} - curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" - << queryCursor->getCursorId()); - // Confirm that the exhaust getMore was interrupted and does not appear in the $currentOp - // output. - const bool expectEmptyResult = true; - ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult)); - - setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); - setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false); - - curOpMatch = BSON("type" - << "idleCursor" - << "cursor.cursorId" << queryCursor->getCursorId()); - // Confirm that the cursor was cleaned up and does not appear in the $currentOp idleCursor - // output. - ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult)); +TEST_F(CurrentOpExhaustCursorTestFixture, CleanupExhaustCursorOnBrokenPipe) { + // Test that exhaust cursor is cleaned up on broken pipe even if the exhaust getMore succeeded. + testClientDisconnect(true /* disconnectAfterGetMoreBatch */); } -TEST(CurrentOpExhaustCursorTest, InterruptExhaustCursorPseudoGetMoreOnClientDisconnect) { +TEST_F(CurrentOpExhaustCursorTestFixture, + InterruptExhaustCursorPseudoGetMoreOnClientDisconnectDbQuery) { + _op = dbQuery; // Test that an exhaust getMore is interrupted on client disconnect. testClientDisconnect(false /* disconnectAfterGetMoreBatch */); } -TEST(CurrentOpExhaustCursorTest, CleanupExhaustCursorOnBrokenPipe) { +TEST_F(CurrentOpExhaustCursorTestFixture, CleanupExhaustCursorOnBrokenPipeDbQuery) { + _op = dbQuery; // Test that exhaust cursor is cleaned up on broken pipe even if the exhaust getMore succeeded. testClientDisconnect(true /* disconnectAfterGetMoreBatch */); } -TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) { +TEST_F(CurrentOpExhaustCursorTestFixture, ExhaustCursorUpdatesLastKnownCommittedOpTime) { auto fixtureConn = connect(); // We need to test the lastKnownCommittedOpTime in exhaust getMore requests. So we need a diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index c9355a84d58..16ff3dae0d1 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -113,6 +113,7 @@ MONGO_FAIL_POINT_DEFINE(skipCheckingForNotPrimaryInCommandDispatch); MONGO_FAIL_POINT_DEFINE(sleepMillisAfterCommandExecutionBegins); MONGO_FAIL_POINT_DEFINE(waitAfterNewStatementBlocksBehindPrepare); MONGO_FAIL_POINT_DEFINE(waitAfterCommandFinishesExecution); +MONGO_FAIL_POINT_DEFINE(waitAfterOpGetMoreFinishesExecution); MONGO_FAIL_POINT_DEFINE(failWithErrorCodeInRunCommand); MONGO_FAIL_POINT_DEFINE(hangBeforeSessionCheckOut); MONGO_FAIL_POINT_DEFINE(hangBeforeSettingTxnInterruptFlag); @@ -1655,6 +1656,8 @@ DbResponse receivedGetMore(OperationContext* opCtx, curop.debug().responseLength = dbresponse.response.header().dataLen(); curop.debug().nreturned = 1; *shouldLogOpDebug = true; + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &waitAfterOpGetMoreFinishesExecution, opCtx, "waitAfterOpGetMoreFinishesExecution"); return dbresponse; } @@ -1667,6 +1670,8 @@ DbResponse receivedGetMore(OperationContext* opCtx, dbresponse.shouldRunAgainForExhaust = true; } + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &waitAfterOpGetMoreFinishesExecution, opCtx, "waitAfterOpGetMoreFinishesExecution"); return dbresponse; } diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 42a633829af..057799dd435 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -684,10 +684,20 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try { if (!_inExhaust) { return; } - auto request = OpMsgRequest::parse(_inMessage); + long long cursorId = 0; + if (_inMessage.operation() == dbGetMore) { + DbMessage dbm(_inMessage); + [[maybe_unused]] const int ntoreturn = dbm.pullInt(); + cursorId = dbm.pullInt64(); + } else { + invariant(_inMessage.operation() == dbMsg); + auto request = OpMsgRequest::parse(_inMessage); + if (request.getCommandName() == "getMore"_sd) { + cursorId = request.body["getMore"].Long(); + } + } // Clean up cursor for exhaust getMore request. - if (request.getCommandName() == "getMore"_sd) { - auto cursorId = request.body["getMore"].Long(); + if (cursorId != 0) { auto opCtx = Client::getCurrent()->makeOperationContext(); // Fire and forget. This is a best effort attempt to immediately clean up the exhaust // cursor. If the killCursors request fails here for any reasons, it will still be |