diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2019-12-10 04:54:40 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-10 04:54:40 +0000 |
commit | 1bbb3a2b7752ca1c6c254e78494b945597a4c72b (patch) | |
tree | cb55041764de44bcb960cf9bce28d36cc05f906f /src/mongo/db/exhaust_cursor_currentop_integration_test.cpp | |
parent | c814bdca1d3f7605abac899fd665091c85af475b (diff) | |
download | mongo-1bbb3a2b7752ca1c6c254e78494b945597a4c72b.tar.gz |
SERVER-44700: Call markKillOnClientDisconnect for exhaust cursors
Diffstat (limited to 'src/mongo/db/exhaust_cursor_currentop_integration_test.cpp')
-rw-r--r-- | src/mongo/db/exhaust_cursor_currentop_integration_test.cpp | 222 |
1 files changed, 154 insertions, 68 deletions
diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp index ad98dcfdc35..b2f1851ec4b 100644 --- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp +++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" +#include "mongo/client/dbclient_connection.h" #include "mongo/db/query/cursor_response.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/future.h" @@ -38,21 +39,39 @@ namespace mongo { namespace { +// Specifies the amount of time we are willing to wait for a parallel operation to complete. +const auto parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000); + // Obtain a pointer to the global system clock. Used to enforce timeouts in the parallel thread. auto* const clock = SystemClockSource::get(); -std::unique_ptr<DBClientBase> connect(StringData appName) { +const NamespaceString testNSS{"exhaust_cursor_currentop.testColl"}; + +const StringData testAppName = "curop_exhaust_cursor_test"; +std::unique_ptr<DBClientBase> connect(StringData appName = testAppName) { std::string errMsg; auto conn = unittest::getFixtureConnectionString().connect(appName.toString(), errMsg); uassert(ErrorCodes::SocketException, errMsg, conn); return conn; } +const StringData testBackgroundAppName = "curop_exhaust_cursor_test_bg"; + +void initTestCollection(DBClientBase* conn) { + // Drop and recreate the test namespace. + conn->dropCollection(testNSS.ns()); + for (int i = 0; i < 10; i++) { + auto insertCmd = + BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i))); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + } +} void setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(DBClientBase* conn, bool enable) { auto cmdObj = BSON("configureFailPoint" << "waitWithPinnedCursorDuringGetMoreBatch" << "mode" << (enable ? "alwaysOn" : "off") << "data" - << BSON("shouldNotdropLock" << true)); + << BSON("shouldNotdropLock" << true << "shouldContinueOnInterrupt" << true)); auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj)); ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); } @@ -68,56 +87,31 @@ void setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(DBClientBa bool confirmCurrentOpContents(DBClientBase* conn, BSONObj curOpMatch, - Milliseconds timeoutMS, + bool expectEmptyResult = false, + Milliseconds timeoutMS = Milliseconds(5 * 60 * 1000), Milliseconds intervalMS = Milliseconds(200)) { - auto curOpCmd = BSON( - "aggregate" << 1 << "cursor" << BSONObj() << "pipeline" - << BSON_ARRAY(BSON("$currentOp" << BSONObj()) << BSON("$match" << curOpMatch))); + auto curOpCmd = BSON("aggregate" << 1 << "cursor" << BSONObj() << "pipeline" + << BSON_ARRAY(BSON("$currentOp" << BSON("idleCursors" << true)) + << BSON("$match" << curOpMatch))); const auto startTime = clock->now(); while (clock->now() - startTime < timeoutMS) { auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", curOpCmd)); auto swCursorRes = CursorResponse::parseFromBSON(reply->getCommandReply()); ASSERT_OK(swCursorRes.getStatus()); - if (!swCursorRes.getValue().getBatch().empty()) { + if (swCursorRes.getValue().getBatch().empty() == expectEmptyResult) { return true; } sleepFor(intervalMS); } return false; } -} // namespace - -TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOpOutput) { - const NamespaceString testNSS{"exhaust_cursor_currentop.exhaust_cursor_currentop"}; - auto conn = connect("curop_exhaust_cursor_test"); - - // Specifies the amount of time we are willing to wait for a parallel operation to complete. - const auto parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000); - - // We need to set failpoints around getMore which cause it to hang, so only test against a - // single server rather than a replica set or mongoS. - if (conn->isReplicaSetMember() || conn->isMongos()) { - return; - } - - // Drop and recreate the test namespace. - conn->dropCollection(testNSS.ns()); - for (int i = 0; i < 10; i++) { - auto insertCmd = - BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i))); - auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd)); - ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); - } - // Enable a failpoint to block getMore during execution. - setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true); - - // Execute a query on a separate thread, with the 'exhaust' flag set. - std::unique_ptr<DBClientBase> queryConnection; - std::unique_ptr<DBClientCursor> queryCursor; - auto queryThread = stdx::async(stdx::launch::async, [&testNSS, &queryConnection, &queryCursor] { - queryConnection = connect("curop_exhaust_cursor_test_bg"); - auto projSpec = BSON("_id" << 0 << "a" << 1); +// Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in +// subsequent 'getMore's. +auto startExhaustQuery(DBClientBase* queryConnection, + std::unique_ptr<DBClientCursor>& queryCursor) { + auto queryThread = stdx::async(stdx::launch::async, [&] { + const auto projSpec = BSON("_id" << 0 << "a" << 1); // Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then iterate // through the first batch and confirm that the results are as expected. queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, QueryOption_Exhaust, 2); @@ -133,48 +127,140 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp ASSERT(queryCursor->more()); }); - // Ensure that, regardless of whether the test completes or fails, we release all failpoints. - ON_BLOCK_EXIT([&conn, &queryThread] { - setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); - setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false); - queryThread.wait(); - }); - // Wait until the parallel operation initializes its cursor. const auto startTime = clock->now(); while (!queryCursor && (clock->now() - startTime < parallelWaitTimeoutMS)) { sleepFor(Milliseconds(10)); } ASSERT(queryCursor); + unittest::log() << "Started exhaust query with cursorId: " << queryCursor->getCursorId(); + return queryThread; +} + +void runOneGetMore(DBClientBase* conn, + const std::unique_ptr<DBClientCursor>& queryCursor, + int nDocsReturned) { + const auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" + << queryCursor->getCursorId() << "msg" + << "waitWithPinnedCursorDuringGetMoreBatch" + << "cursor.nDocsReturned" << nDocsReturned); + // Confirm that the initial getMore appears in the $currentOp output. + ASSERT(confirmCurrentOpContents(conn, curOpMatch)); + + // Airlock the failpoint by releasing it only after we enable a post-getMore failpoint. This + // ensures that no subsequent getMores can run before we re-enable the original failpoint. + setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, true); + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, false); + // Confirm that the getMore completed its batch and hit the post-getMore failpoint. + ASSERT(confirmCurrentOpContents( + conn, + BSON("command.getMore" << queryCursor->getCursorId() << "msg" + << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"))); + + // Re-enable the original failpoint to catch the next getMore, and release the current one. + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, true); + setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, false); + + ASSERT(queryCursor->more()); + // Assuming documents start with {a: 0}, the (nDocsReturned+1)-th document should have {a: + // nDocsReturned}. See initTestCollection(). + ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << nDocsReturned)); +} +} // namespace + +TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOpOutput) { + auto conn = connect(); + + // We need to set failpoints around getMore which cause it to hang, so only test against a + // single server rather than a replica set or mongoS. + if (conn->isReplicaSetMember() || conn->isMongos()) { + return; + } + + initTestCollection(conn.get()); + + // Enable a failpoint to block getMore during execution. + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true); + + const auto queryConnection = connect(testBackgroundAppName); + std::unique_ptr<DBClientCursor> queryCursor; + + // Execute a query on a separate thread, with the 'exhaust' flag set. + auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor); + // Ensure that, regardless of whether the test completes or fails, we release all failpoints. + ON_BLOCK_EXIT([&conn, &queryThread] { + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); + setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false); + queryThread.wait(); + }); // We expect that the server, having received the first {batchSize: 1} getMore for the parallel // thread's exhaust cursor, will produce a series of pseudo-getMores internally and stream the // results back to the client until the cursor is exhausted. Here, we verify that each of these // pseudo-getMores appear in the $currentOp output. for (int i = 2; i < 10; ++i) { - // Generate a currentOp filter based on the cursorId and the cumulative nDocsReturned. - const auto curOpMatch = BSON("command.collection" - << "exhaust_cursor_currentop" - << "command.getMore" << queryCursor->getCursorId() << "msg" - << "waitWithPinnedCursorDuringGetMoreBatch" - << "cursor.nDocsReturned" << i); - - // Confirm that the exhaust getMore appears in the $currentOp output. - ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, parallelWaitTimeoutMS)); - - // Airlock the failpoint by releasing it only after we enable a post-getMore failpoint. This - // ensures that no subsequent getMores can run before we re-enable the original failpoint. - setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), true); + runOneGetMore(conn.get(), queryCursor, i); + } +} + +TEST(CurrentOpExhaustCursorTest, InterruptExhaustCursorPseudoGetMoreOnClientDisconnect) { + auto conn = connect(); + + // We need to set failpoints around getMore which cause it to hang, so only test against a + // single server rather than a replica set or mongoS. + if (conn->isReplicaSetMember() || conn->isMongos()) { + return; + } + + initTestCollection(conn.get()); + + // Enable a failpoint to block getMore during execution. + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true); + + const auto connStr = unittest::getFixtureConnectionString(); + const auto queryConnection = std::make_unique<DBClientConnection>(); + uassertStatusOK(queryConnection->connect(connStr.getServers()[0], testBackgroundAppName)); + std::unique_ptr<DBClientCursor> queryCursor; + + // Execute a query on a separate thread, with the 'exhaust' flag set. + auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor); + // Ensure that, regardless of whether the test completes or fails, we release all failpoints. + ON_BLOCK_EXIT([&conn, &queryThread] { setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); - // Confirm that the getMore completed its batch and hit the post-getMore failpoint. - ASSERT(confirmCurrentOpContents( - conn.get(), - BSON("command.getMore" << queryCursor->getCursorId() << "msg" - << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"), - parallelWaitTimeoutMS)); - // Re-enable the original failpoint to catch the next getMore, and release the current one. - setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true); setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false); - } + queryThread.wait(); + }); + + // This will allow the initial getMore to run. + runOneGetMore(conn.get(), queryCursor, 2); + + // The next getMore will be an exhaust getMore. Confirm that the exhaust getMore appears in the + // $currentOp output. + auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" + << queryCursor->getCursorId() << "msg" + << "waitWithPinnedCursorDuringGetMoreBatch" + << "cursor.nDocsReturned" << 3); + ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch)); + + // Kill the client connection while the exhaust getMore is blocked on the failpoint. + queryConnection->shutdownAndDisallowReconnect(); + unittest::log() << "Killed exhaust connection."; + + curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" + << queryCursor->getCursorId()); + // Confirm that the exhaust getMore was interrupted and does not appear in the $currentOp + // output. + const bool expectEmptyResult = true; + ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult)); + + setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); + setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false); + + curOpMatch = BSON("type" + << "idleCursor" + << "cursor.cursorId" << queryCursor->getCursorId()); + // Confirm that the cursor was cleaned up and does not appear in the $currentOp idleCursor + // output. + ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult)); } } // namespace mongo |