From 5ebe746f1929c21588e32a91a924ac49bce59acd Mon Sep 17 00:00:00 2001 From: Romans Kasperovics Date: Fri, 25 Nov 2022 23:45:05 +0000 Subject: SERVER-71561 Protect shared variable and use notify() in exhaust_cursor_currentop_integration_test --- .../exhaust_cursor_currentop_integration_test.cpp | 78 ++++++++++++---------- 1 file changed, 43 insertions(+), 35 deletions(-) (limited to 'src/mongo/db/exhaust_cursor_currentop_integration_test.cpp') diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp index 3bb0217b72b..a97493a9dbd 100644 --- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp +++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp @@ -46,7 +46,7 @@ namespace mongo { namespace { // Specifies the amount of time we are willing to wait for a parallel operation to complete. -const auto parallelWaitTimeoutMS = Milliseconds(5 * 60 * 1000); +const auto parallelWaitTimeoutMS = stdx::chrono::milliseconds(5 * 60 * 1000); // Obtain a pointer to the global system clock. Used to enforce timeouts in the parallel thread. auto* const clock = SystemClockSource::get(); @@ -139,13 +139,19 @@ repl::OpTime getLastAppliedOpTime(DBClientBase* conn) { // subsequent 'getMore's. auto startExhaustQuery( DBClientBase* queryConnection, - std::unique_ptr& queryCursor, + CursorId& queryCursorId, int queryOptions = 0, Milliseconds awaitDataTimeoutMS = Milliseconds(5000), const boost::optional& lastKnownCommittedOpTime = boost::none) { + boost::optional cursorId; + auto cursorIdMutex = MONGO_MAKE_LATCH(); // Protects the 'cursorId' variable. + stdx::condition_variable cursorIdCV; // Synchronizes the threads on 'cursorId' initialization. + auto queryThread = stdx::async( stdx::launch::async, - [&queryCursor, + [&cursorId, + &cursorIdMutex, + &cursorIdCV, queryConnection, queryOptions, awaitDataTimeoutMS, @@ -164,7 +170,13 @@ auto startExhaustQuery( findCmd.setAwaitData(true); } - queryCursor = queryConnection->find(findCmd, ReadPreferenceSetting{}, ExhaustMode::kOn); + auto queryCursor = + queryConnection->find(findCmd, ReadPreferenceSetting{}, ExhaustMode::kOn); + { + stdx::lock_guard writeLock(cursorIdMutex); + cursorId = queryCursor->getCursorId(); + } + cursorIdCV.notify_one(); for (int i = 0; i < 2; ++i) { ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << i)); } @@ -188,25 +200,23 @@ auto startExhaustQuery( }); // Wait until the parallel operation initializes its cursor. - const auto startTime = clock->now(); - while (!queryCursor && (clock->now() - startTime < parallelWaitTimeoutMS)) { - sleepFor(Milliseconds(10)); + { + stdx::unique_lock lk(cursorIdMutex); + cursorIdCV.wait_for(lk, parallelWaitTimeoutMS, [&] { return cursorId.has_value(); }); } - ASSERT(queryCursor); + ASSERT(cursorId); LOGV2(20607, "Started exhaust query with cursorId: {queryCursor_getCursorId}", - "queryCursor_getCursorId"_attr = queryCursor->getCursorId()); + "queryCursor_getCursorId"_attr = *cursorId); + queryCursorId = *cursorId; return queryThread; } -void runOneGetMore(DBClientBase* conn, - const std::unique_ptr& queryCursor, - int nDocsReturned) { - const auto curOpMatch = - BSON("command.collection" << testNSS.coll() << "command.getMore" - << queryCursor->getCursorId() << "failpointMsg" - << "waitWithPinnedCursorDuringGetMoreBatch" - << "cursor.nDocsReturned" << nDocsReturned); +void runOneGetMore(DBClientBase* conn, CursorId queryCursorId, int nDocsReturned) { + const auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" + << queryCursorId << "failpointMsg" + << "waitWithPinnedCursorDuringGetMoreBatch" + << "cursor.nDocsReturned" << nDocsReturned); // Confirm that the initial getMore appears in the $currentOp output. ASSERT(confirmCurrentOpContents(conn, curOpMatch)); @@ -217,7 +227,7 @@ void runOneGetMore(DBClientBase* conn, // Confirm that the getMore completed its batch and hit the post-getMore failpoint. ASSERT(confirmCurrentOpContents( conn, - BSON("command.getMore" << queryCursor->getCursorId() << "failpointMsg" + BSON("command.getMore" << queryCursorId << "failpointMsg" << "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"))); // Re-enable the original failpoint to catch the next getMore, and release the current one. @@ -241,10 +251,10 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), true); const auto queryConnection = connect(testBackgroundAppName); - std::unique_ptr queryCursor; + CursorId queryCursorId; // Execute a query on a separate thread, with the 'exhaust' flag set. - auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor); + auto queryThread = startExhaustQuery(queryConnection.get(), queryCursorId); // Ensure that, regardless of whether the test completes or fails, we release all failpoints. ON_BLOCK_EXIT([&conn, &queryThread] { setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); @@ -257,7 +267,7 @@ TEST(CurrentOpExhaustCursorTest, CanSeeEachExhaustCursorPseudoGetMoreInCurrentOp // results back to the client until the cursor is exhausted. Here, we verify that each of these // pseudo-getMores appear in the $currentOp output. for (int i = 2; i < 10; ++i) { - runOneGetMore(conn.get(), queryCursor, i); + runOneGetMore(conn.get(), queryCursorId, i); } } @@ -282,10 +292,10 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) { const auto queryConnection = std::make_unique(); uassertStatusOK( queryConnection->connect(connStr.getServers()[0], testBackgroundAppName, boost::none)); - std::unique_ptr queryCursor; + CursorId queryCursorId; // Execute a query on a separate thread, with the 'exhaust' flag set. - auto queryThread = startExhaustQuery(queryConnection.get(), queryCursor); + auto queryThread = startExhaustQuery(queryConnection.get(), queryCursorId); // Ensure that, regardless of whether the test completes or fails, we release all failpoints. ON_BLOCK_EXIT([&conn, &queryThread] { setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); @@ -294,12 +304,12 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) { }); // This will allow the initial getMore to run. - runOneGetMore(conn.get(), queryCursor, 2); + runOneGetMore(conn.get(), queryCursorId, 2); // The next getMore will be an exhaust getMore. Confirm that the exhaust getMore appears in the // $currentOp output. auto curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" - << queryCursor->getCursorId() << "failpointMsg" + << queryCursorId << "failpointMsg" << "waitWithPinnedCursorDuringGetMoreBatch" << "cursor.nDocsReturned" << 3); ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch)); @@ -310,7 +320,7 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) { setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn.get(), false); ASSERT(confirmCurrentOpContents(conn.get(), BSON("command.getMore" - << queryCursor->getCursorId() << "failpointMsg" + << queryCursorId << "failpointMsg" << "waitAfterCommandFinishesExecution"))); } @@ -324,8 +334,7 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) { setWaitAfterCommandFinishesExecutionFailpoint(conn.get(), false); } - curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" - << queryCursor->getCursorId()); + curOpMatch = BSON("command.collection" << testNSS.coll() << "command.getMore" << queryCursorId); // Confirm that the exhaust getMore was interrupted and does not appear in the $currentOp // output. const bool expectEmptyResult = true; @@ -336,7 +345,7 @@ void testClientDisconnect(bool disconnectAfterGetMoreBatch) { curOpMatch = BSON("type" << "idleCursor" - << "cursor.cursorId" << queryCursor->getCursorId()); + << "cursor.cursorId" << queryCursorId); // Confirm that the cursor was cleaned up and does not appear in the $currentOp idleCursor // output. ASSERT(confirmCurrentOpContents(conn.get(), curOpMatch, expectEmptyResult)); @@ -386,7 +395,7 @@ TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) { const auto fixtureQueryConn = connect(testBackgroundAppName); DBClientBase* queryConn = &static_cast(fixtureQueryConn.get())->primaryConn(); - std::unique_ptr queryCursor; + CursorId queryCursorId; // Enable a failpoint to block getMore during execution to avoid races between getCursorId() and // receiving new batches. @@ -395,14 +404,13 @@ TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) { // Initiate a tailable awaitData exhaust cursor with lastKnownCommittedOpTime being the // lastAppliedOpTime. auto queryThread = startExhaustQuery(queryConn, - queryCursor, + queryCursorId, QueryOption_CursorTailable | QueryOption_AwaitData, Milliseconds(1000), // awaitData timeout lastAppliedOpTime); // lastKnownCommittedOpTime // Assert non-zero cursorId. - auto cursorId = queryCursor->getCursorId(); - ASSERT_NE(cursorId, 0LL); + ASSERT_NE(queryCursorId, 0LL); // Disable failpoint and allow exhaust queries to run. setWaitWithPinnedCursorDuringGetMoreBatchFailpoint(conn, false); @@ -412,7 +420,7 @@ TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) { // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the // lastAppliedOpTime. auto curOpMatch = - BSON("command.collection" << testNSS.coll() << "command.getMore" << cursorId + BSON("command.collection" << testNSS.coll() << "command.getMore" << queryCursorId << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime); ASSERT(confirmCurrentOpContents(conn, curOpMatch)); @@ -430,7 +438,7 @@ TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) { // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the // new lastAppliedOpTime. curOpMatch = - BSON("command.collection" << testNSS.coll() << "command.getMore" << cursorId + BSON("command.collection" << testNSS.coll() << "command.getMore" << queryCursorId << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime); ASSERT(confirmCurrentOpContents(conn, curOpMatch)); } -- cgit v1.2.1