summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRomans Kasperovics <romans.kasperovics@mongodb.com>2022-11-25 23:45:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-26 00:19:44 +0000
commit5ebe746f1929c21588e32a91a924ac49bce59acd (patch)
treef9c959f750fe3926228d2de0b6a2fddd1add264f
parent5f7a95abeefa77b1ae9ad9918b3319ec107e2364 (diff)
downloadmongo-5ebe746f1929c21588e32a91a924ac49bce59acd.tar.gz
SERVER-71561 Protect shared variable and use notify() in exhaust_cursor_currentop_integration_test
-rw-r--r--src/mongo/db/exhaust_cursor_currentop_integration_test.cpp78
1 files changed, 43 insertions, 35 deletions
diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
index 3bb0217b72b..a97493a9dbd 100644
--- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
+++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
@@ -46,7 +46,7 @@
namespace mongo {
namespace {
// Specifies the amount of time we are willing to wait for a parallel operation to complete.
-const auto parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000);
+const auto parallelWaitTimeoutMS = stdx::chrono::milliseconds(5 * 60 * 1000);
// Obtain a pointer to the global system clock. Used to enforce timeouts in the parallel thread.
auto* const clock = SystemClockSource::get();
@@ -139,13 +139,19 @@ repl::OpTime getLastAppliedOpTime(DBClientBase* conn) {
// subsequent 'getMore's.
auto startExhaustQuery(
DBClientBase* queryConnection,
- std::unique_ptr<DBClientCursor>& queryCursor,
+ CursorId& queryCursorId,
int queryOptions = 0,
Milliseconds awaitDataTimeoutMS = Milliseconds(5000),
const boost::optional<repl::OpTime>& lastKnownCommittedOpTime = boost::none) {
+ boost::optional<CursorId> cursorId;
+ auto cursorIdMutex = MONGO_MAKE_LATCH(); // Protects the 'cursorId' variable.
+ stdx::condition_variable cursorIdCV; // Synchronizes the threads on 'cursorId' initialization.
+
auto queryThread = stdx::async(
stdx::launch::async,
- [&queryCursor,
+ [&cursorId,
+ &cursorIdMutex,
+ &cursorIdCV,
queryConnection,
queryOptions,
awaitDataTimeoutMS,
@@ -164,7 +170,13 @@ auto startExhaustQuery(
findCmd.setAwaitData(true);
}
- queryCursor = queryConnection->find(findCmd, ReadPreferenceSetting{}, ExhaustMode::kOn);
+ auto queryCursor =
+ queryConnection->find(findCmd, ReadPreferenceSetting{}, ExhaustMode::kOn);
+ {
+ stdx::lock_guard writeLock(cursorIdMutex);
+ cursorId = queryCursor->getCursorId();
+ }
+ cursorIdCV.notify_one();
for (int i = 0; i < 2; ++i) {
ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << i));
}
@@ -188,25 +200,23 @@ auto startExhaustQuery(
});
// Wait until the parallel operation initializes its cursor.
- const auto startTime = clock->now();
- while (!queryCursor && (clock->now() - startTime < parallelWaitTimeoutMS)) {
- sleepFor(Milliseconds(10));
+ {
+ stdx::unique_lock<Latch> lk(cursorIdMutex);
+ cursorIdCV.wait_for(lk, parallelWaitTimeoutMS, [&] { return cursorId.has_value(); });
}
- ASSERT(queryCursor);
+ ASSERT(cursorId);
LOGV2(20607,
"Started exhaust query with cursorId: {queryCursor_getCursorId}",
- "queryCursor_getCursorId"_attr = queryCursor->getCursorId());
+ "queryCursor_getCursorId"_attr = *cursorId);
+ queryCursorId = *cursorId;
return queryThread;
}
-void runOneGetMore(DBClientBase* conn,
- const std::unique_ptr<DBClientCursor>& queryCursor,
- int nDocsReturned) {
- const auto curOpMatch =
- BSON("command.collection" << testNSS.coll() << "command.getMore"
- << queryCursor->getCursorId() << "failpointMsg"
- << "waitWithPinnedCursorDuringGetMoreBatch"
- << "cursor.nDocsReturned" << nDocsReturned);
+void runOneGetMore(DBClientBase* conn, CursorId queryCursorId, int nDocsReturned) {
+ const auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
+ << queryCursorId << "failpointMsg"
+ << "waitWithPinnedCursorDuringGetMoreBatch"
+ << "cursor.nDocsReturned" << nDocsReturned);
// Confirm that the initial getMore appears in the $currentOp output.
ASSERT(confirmCurrentOpContents(conn, curOpMatch));
@@ -217,7 +227,7 @@ void runOneGetMore(DBClientBase* conn,
// Confirm that the getMore completed its batch and hit the post-getMore failpoint.
ASSERT(confirmCurrentOpContents(
conn,
- BSON("command.getMore" << queryCursor->getCursorId() << "failpointMsg"
+ BSON("command.getMore" << queryCursorId << "failpointMsg"
<< "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch")));
// Re-enable the original failpoint to catch the next getMore, and release the current one.
@@ -241,10 +251,10 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp
setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true);
const auto queryConnection = connect(testBackgroundAppName);
- std::unique_ptr<DBClientCursor> queryCursor;
+ CursorId queryCursorId;
// Execute a query on a separate thread, with the 'exhaust' flag set.
- auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor);
+ auto queryThread = startExhaustQuery(queryConnection.get(), queryCursorId);
// Ensure that, regardless of whether the test completes or fails, we release all failpoints.
ON_BLOCK_EXIT([&conn, &queryThread] {
setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
@@ -257,7 +267,7 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp
// results back to the client until the cursor is exhausted. Here, we verify that each of these
// pseudo-getMores appear in the $currentOp output.
for (int i = 2; i < 10; ++i) {
- runOneGetMore(conn.get(), queryCursor, i);
+ runOneGetMore(conn.get(), queryCursorId, i);
}
}
@@ -282,10 +292,10 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) {
const auto queryConnection = std::make_unique<DBClientConnection>();
uassertStatusOK(
queryConnection->connect(connStr.getServers()[0], testBackgroundAppName, boost::none));
- std::unique_ptr<DBClientCursor> queryCursor;
+ CursorId queryCursorId;
// Execute a query on a separate thread, with the 'exhaust' flag set.
- auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor);
+ auto queryThread = startExhaustQuery(queryConnection.get(), queryCursorId);
// Ensure that, regardless of whether the test completes or fails, we release all failpoints.
ON_BLOCK_EXIT([&conn, &queryThread] {
setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
@@ -294,12 +304,12 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) {
});
// This will allow the initial getMore to run.
- runOneGetMore(conn.get(), queryCursor, 2);
+ runOneGetMore(conn.get(), queryCursorId, 2);
// The next getMore will be an exhaust getMore. Confirm that the exhaust getMore appears in the
// $currentOp output.
auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
- << queryCursor->getCursorId() << "failpointMsg"
+ << queryCursorId << "failpointMsg"
<< "waitWithPinnedCursorDuringGetMoreBatch"
<< "cursor.nDocsReturned" << 3);
ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch));
@@ -310,7 +320,7 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) {
setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false);
ASSERT(confirmCurrentOpContents(conn.get(),
BSON("command.getMore"
- << queryCursor->getCursorId() << "failpointMsg"
+ << queryCursorId << "failpointMsg"
<< "waitAfterCommandFinishesExecution")));
}
@@ -324,8 +334,7 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) {
setWaitAfterCommandFinishesExecutionFailpoint(conn.get(), false);
}
- curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore"
- << queryCursor->getCursorId());
+ curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" << queryCursorId);
// Confirm that the exhaust getMore was interrupted and does not appear in the $currentOp
// output.
const bool expectEmptyResult = true;
@@ -336,7 +345,7 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) {
curOpMatch = BSON("type"
<< "idleCursor"
- << "cursor.cursorId" << queryCursor->getCursorId());
+ << "cursor.cursorId" << queryCursorId);
// Confirm that the cursor was cleaned up and does not appear in the $currentOp idleCursor
// output.
ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult));
@@ -386,7 +395,7 @@ TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) {
const auto fixtureQueryConn = connect(testBackgroundAppName);
DBClientBase* queryConn =
&static_cast<DBClientReplicaSet*>(fixtureQueryConn.get())->primaryConn();
- std::unique_ptr<DBClientCursor> queryCursor;
+ CursorId queryCursorId;
// Enable a failpoint to block getMore during execution to avoid races between getCursorId() and
// receiving new batches.
@@ -395,14 +404,13 @@ TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) {
// Initiate a tailable awaitData exhaust cursor with lastKnownCommittedOpTime being the
// lastAppliedOpTime.
auto queryThread = startExhaustQuery(queryConn,
- queryCursor,
+ queryCursorId,
QueryOption_CursorTailable | QueryOption_AwaitData,
Milliseconds(1000), // awaitData timeout
lastAppliedOpTime); // lastKnownCommittedOpTime
// Assert non-zero cursorId.
- auto cursorId = queryCursor->getCursorId();
- ASSERT_NE(cursorId, 0LL);
+ ASSERT_NE(queryCursorId, 0LL);
// Disable failpoint and allow exhaust queries to run.
setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, false);
@@ -412,7 +420,7 @@ TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) {
// Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the
// lastAppliedOpTime.
auto curOpMatch =
- BSON("command.collection" << testNSS.coll() << "command.getMore" << cursorId
+ BSON("command.collection" << testNSS.coll() << "command.getMore" << queryCursorId
<< "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime);
ASSERT(confirmCurrentOpContents(conn, curOpMatch));
@@ -430,7 +438,7 @@ TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) {
// Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the
// new lastAppliedOpTime.
curOpMatch =
- BSON("command.collection" << testNSS.coll() << "command.getMore" << cursorId
+ BSON("command.collection" << testNSS.coll() << "command.getMore" << queryCursorId
<< "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime);
ASSERT(confirmCurrentOpContents(conn, curOpMatch));
}