summaryrefslogtreecommitdiff
path: root/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2019-12-10 04:54:40 +0000
committerevergreen <evergreen@mongodb.com>2019-12-10 04:54:40 +0000
commit1bbb3a2b7752ca1c6c254e78494b945597a4c72b (patch)
treecb55041764de44bcb960cf9bce28d36cc05f906f /src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
parentc814bdca1d3f7605abac899fd665091c85af475b (diff)
downloadmongo-1bbb3a2b7752ca1c6c254e78494b945597a4c72b.tar.gz
SERVER-44700: Call markKillOnClientDisconnect for exhaust cursors
Diffstat (limited to 'src/mongo/db/exhaust_cursor_currentop_integration_test.cpp')
-rw-r--r--src/mongo/db/exhaust_cursor_currentop_integration_test.cpp222
1 files changed, 154 insertions, 68 deletions
diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
index ad98dcfdc35..b2f1851ec4b 100644
--- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
+++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/client/dbclient_connection.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/future.h"
@@ -38,21 +39,39 @@
namespace mongo {
namespace {
+// Specifies the amount of time we are willing to wait for a parallel operation to complete.
+const auto parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000);
+
// Obtain a pointer to the global system clock. Used to enforce timeouts in the parallel thread.
auto* const clock = SystemClockSource::get();
-std::unique_ptr<DBClientBase> connect(StringData appName) {
+const NamespaceString testNSS{"exhaust_cursor_currentop.testColl"};
+
+const StringData testAppName = "curop_exhaust_cursor_test";
+std::unique_ptr<DBClientBase> connect(StringData appName = testAppName) {
std::string errMsg;
auto conn = unittest::getFixtureConnectionString().connect(appName.toString(), errMsg);
uassert(ErrorCodes::SocketException, errMsg, conn);
return conn;
}
+const StringData testBackgroundAppName = "curop_exhaust_cursor_test_bg";
+
+void initTestCollection(DBClientBase* conn) {
+ // Drop and recreate the test namespace.
+ conn->dropCollection(testNSS.ns());
+ for (int i = 0; i < 10; i++) {
+ auto insertCmd =
+ BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i)));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ }
+}
void setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(DBClientBase* conn, bool enable) {
auto cmdObj = BSON("configureFailPoint"
<< "waitWithPinnedCursorDuringGetMoreBatch"
<< "mode" << (enable ? "alwaysOn" : "off") << "data"
- << BSON("shouldNotdropLock" << true));
+ << BSON("shouldNotdropLock" << true << "shouldContinueOnInterrupt" << true));
auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", cmdObj));
ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
}
@@ -68,56 +87,31 @@ void setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(DBClientBa
bool confirmCurrentOpContents(DBClientBase* conn,
BSONObj curOpMatch,
- Milliseconds timeoutMS,
+ bool expectEmptyResult = false,
+ Milliseconds timeoutMS = Milliseconds(5 * 60 * 1000),
Milliseconds intervalMS = Milliseconds(200)) {
- auto curOpCmd = BSON(
- "aggregate" << 1 << "cursor" << BSONObj() << "pipeline"
- << BSON_ARRAY(BSON("$currentOp" << BSONObj()) << BSON("$match" << curOpMatch)));
+ auto curOpCmd = BSON("aggregate" << 1 << "cursor" << BSONObj() << "pipeline"
+ << BSON_ARRAY(BSON("$currentOp" << BSON("idleCursors" << true))
+ << BSON("$match" << curOpMatch)));
const auto startTime = clock->now();
while (clock->now() - startTime < timeoutMS) {
auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody("admin", curOpCmd));
auto swCursorRes = CursorResponse::parseFromBSON(reply->getCommandReply());
ASSERT_OK(swCursorRes.getStatus());
- if (!swCursorRes.getValue().getBatch().empty()) {
+ if (swCursorRes.getValue().getBatch().empty() == expectEmptyResult) {
return true;
}
sleepFor(intervalMS);
}
return false;
}
-} // namespace
-
-TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOpOutput) {
- const NamespaceString testNSS{"exhaust_cursor_currentop.exhaust_cursor_currentop"};
- auto conn = connect("curop_exhaust_cursor_test");
-
- // Specifies the amount of time we are willing to wait for a parallel operation to complete.
- const auto parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000);
-
- // We need to set failpoints around getMore which cause it to hang, so only test against a
- // single server rather than a replica set or mongoS.
- if (conn->isReplicaSetMember() || conn->isMongos()) {
- return;
- }
-
- // Drop and recreate the test namespace.
- conn->dropCollection(testNSS.ns());
- for (int i = 0; i < 10; i++) {
- auto insertCmd =
- BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i)));
- auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd));
- ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
- }
- // Enable a failpoint to block getMore during execution.
- setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true);
-
- // Execute a query on a separate thread, with the 'exhaust' flag set.
- std::unique_ptr<DBClientBase> queryConnection;
- std::unique_ptr<DBClientCursor> queryCursor;
- auto queryThread = stdx::async(stdx::launch::async, [&testNSS, &queryConnection, &queryCursor] {
- queryConnection = connect("curop_exhaust_cursor_test_bg");
- auto projSpec = BSON("_id" << 0 << "a" << 1);
+// Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in
+// subsequent 'getMore's.
+auto startExhaustQuery(DBClientBase* queryConnection,
+ std::unique_ptr<DBClientCursor>& queryCursor) {
+ auto queryThread = stdx::async(stdx::launch::async, [&] {
+ const auto projSpec = BSON("_id" << 0 << "a" << 1);
// Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then iterate
// through the first batch and confirm that the results are as expected.
queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, QueryOption_Exhaust, 2);
@@ -133,48 +127,140 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp
ASSERT(queryCursor->more());
});
- // Ensure that, regardless of whether the test completes or fails, we release all failpoints.
- ON_BLOCK_EXIT([&conn, &queryThread] {
- setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
- setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false);
- queryThread.wait();
- });
-
// Wait until the parallel operation initializes its cursor.
const auto startTime = clock->now();
while (!queryCursor && (clock->now() - startTime < parallelWaitTimeoutMS)) {
sleepFor(Milliseconds(10));
}
ASSERT(queryCursor);
+ unittest::log() << "Started exhaust query with cursorId: " << queryCursor->getCursorId();
+ return queryThread;
+}
+
+void runOneGetMore(DBClientBase* conn,
+ const std::unique_ptr<DBClientCursor>& queryCursor,
+ int nDocsReturned) {
+ const auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
+ << queryCursor->getCursorId() << "msg"
+ << "waitWithPinnedCursorDuringGetMoreBatch"
+ << "cursor.nDocsReturned" << nDocsReturned);
+ // Confirm that the initial getMore appears in the $currentOp output.
+ ASSERT(confirmCurrentOpContents(conn, curOpMatch));
+
+ // Airlock the failpoint by releasing it only after we enable a post-getMore failpoint. This
+ // ensures that no subsequent getMores can run before we re-enable the original failpoint.
+ setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, true);
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, false);
+ // Confirm that the getMore completed its batch and hit the post-getMore failpoint.
+ ASSERT(confirmCurrentOpContents(
+ conn,
+ BSON("command.getMore" << queryCursor->getCursorId() << "msg"
+ << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch")));
+
+ // Re-enable the original failpoint to catch the next getMore, and release the current one.
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, true);
+ setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn, false);
+
+ ASSERT(queryCursor->more());
+ // Assuming documents start with {a: 0}, the (nDocsReturned+1)-th document should have {a:
+ // nDocsReturned}. See initTestCollection().
+ ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << nDocsReturned));
+}
+} // namespace
+
+TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOpOutput) {
+ auto conn = connect();
+
+ // We need to set failpoints around getMore which cause it to hang, so only test against a
+ // single server rather than a replica set or mongoS.
+ if (conn->isReplicaSetMember() || conn->isMongos()) {
+ return;
+ }
+
+ initTestCollection(conn.get());
+
+ // Enable a failpoint to block getMore during execution.
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true);
+
+ const auto queryConnection = connect(testBackgroundAppName);
+ std::unique_ptr<DBClientCursor> queryCursor;
+
+ // Execute a query on a separate thread, with the 'exhaust' flag set.
+ auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor);
+ // Ensure that, regardless of whether the test completes or fails, we release all failpoints.
+ ON_BLOCK_EXIT([&conn, &queryThread] {
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
+ setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false);
+ queryThread.wait();
+ });
// We expect that the server, having received the first {batchSize: 1} getMore for the parallel
// thread's exhaust cursor, will produce a series of pseudo-getMores internally and stream the
// results back to the client until the cursor is exhausted. Here, we verify that each of these
// pseudo-getMores appear in the $currentOp output.
for (int i = 2; i < 10; ++i) {
- // Generate a currentOp filter based on the cursorId and the cumulative nDocsReturned.
- const auto curOpMatch = BSON("command.collection"
- << "exhaust_cursor_currentop"
- << "command.getMore" << queryCursor->getCursorId() << "msg"
- << "waitWithPinnedCursorDuringGetMoreBatch"
- << "cursor.nDocsReturned" << i);
-
- // Confirm that the exhaust getMore appears in the $currentOp output.
- ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, parallelWaitTimeoutMS));
-
- // Airlock the failpoint by releasing it only after we enable a post-getMore failpoint. This
- // ensures that no subsequent getMores can run before we re-enable the original failpoint.
- setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), true);
+ runOneGetMore(conn.get(), queryCursor, i);
+ }
+}
+
+TEST(CurrentOpExhaustCursorTest, InterruptExhaustCursorPseudoGetMoreOnClientDisconnect) {
+ auto conn = connect();
+
+ // We need to set failpoints around getMore which cause it to hang, so only test against a
+ // single server rather than a replica set or mongoS.
+ if (conn->isReplicaSetMember() || conn->isMongos()) {
+ return;
+ }
+
+ initTestCollection(conn.get());
+
+ // Enable a failpoint to block getMore during execution.
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true);
+
+ const auto connStr = unittest::getFixtureConnectionString();
+ const auto queryConnection = std::make_unique<DBClientConnection>();
+ uassertStatusOK(queryConnection->connect(connStr.getServers()[0], testBackgroundAppName));
+ std::unique_ptr<DBClientCursor> queryCursor;
+
+ // Execute a query on a separate thread, with the 'exhaust' flag set.
+ auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor);
+ // Ensure that, regardless of whether the test completes or fails, we release all failpoints.
+ ON_BLOCK_EXIT([&conn, &queryThread] {
setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
- // Confirm that the getMore completed its batch and hit the post-getMore failpoint.
- ASSERT(confirmCurrentOpContents(
- conn.get(),
- BSON("command.getMore" << queryCursor->getCursorId() << "msg"
- << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"),
- parallelWaitTimeoutMS));
- // Re-enable the original failpoint to catch the next getMore, and release the current one.
- setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true);
setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false);
- }
+ queryThread.wait();
+ });
+
+ // This will allow the initial getMore to run.
+ runOneGetMore(conn.get(), queryCursor, 2);
+
+ // The next getMore will be an exhaust getMore. Confirm that the exhaust getMore appears in the
+ // $currentOp output.
+ auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
+ << queryCursor->getCursorId() << "msg"
+ << "waitWithPinnedCursorDuringGetMoreBatch"
+ << "cursor.nDocsReturned" << 3);
+ ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch));
+
+ // Kill the client connection while the exhaust getMore is blocked on the failpoint.
+ queryConnection->shutdownAndDisallowReconnect();
+ unittest::log() << "Killed exhaust connection.";
+
+ curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
+ << queryCursor->getCursorId());
+ // Confirm that the exhaust getMore was interrupted and does not appear in the $currentOp
+ // output.
+ const bool expectEmptyResult = true;
+ ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult));
+
+ setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
+ setWaitBeforeUnpinningOrDeletingCursorAfterGetMoreBatchFailpoint(conn.get(), false);
+
+ curOpMatch = BSON("type"
+ << "idleCursor"
+ << "cursor.cursorId" << queryCursor->getCursorId());
+ // Confirm that the cursor was cleaned up and does not appear in the $currentOp idleCursor
+ // output.
+ ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult));
}
} // namespace mongo