From d628d67126fb51cebf8960f4d73816f94f2cb99d Mon Sep 17 00:00:00 2001 From: Lingzhi Deng Date: Mon, 6 Jan 2020 15:42:38 +0000 Subject: SERVER-44707: Store lastKnownCommittedOpTime in ClientCursor for exhaust getMore --- src/mongo/db/SConscript | 1 + src/mongo/db/clientcursor.cpp | 1 + src/mongo/db/clientcursor.h | 22 ++++- src/mongo/db/commands/getmore_cmd.cpp | 18 +++- .../exhaust_cursor_currentop_integration_test.cpp | 98 +++++++++++++++++++++- src/mongo/db/generic_cursor.idl | 6 ++ src/mongo/db/query/plan_executor_impl.cpp | 9 +- 7 files changed, 144 insertions(+), 11 deletions(-) (limited to 'src/mongo/db') diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 18f5e8e6795..18fbf1e190b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1664,6 +1664,7 @@ env.Library( ], LIBDEPS=[ 'service_context', + '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/idl/idl_parser', ], ) diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 2e87deb2fd0..7bee0fb09f9 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -152,6 +152,7 @@ GenericCursor ClientCursor::toGenericCursor() const { if (auto opCtx = _operationUsingCursor) { gc.setOperationUsingCursorId(opCtx->getOpID()); } + gc.setLastKnownCommittedOpTime(_lastKnownCommittedOpTime); return gc; } diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 2f5663012d8..7496c9e08b7 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -282,10 +282,6 @@ public: */ GenericCursor toGenericCursor() const; - // - // Timing. - // - /** * Returns the amount of time execution time available to this cursor. Only valid at the * beginning of a getMore request, and only really for use by the maxTime tracking code. @@ -306,6 +302,20 @@ public: _leftoverMaxTimeMicros = leftoverMaxTimeMicros; } + /** + * Returns the commit point at the time the last batch was returned. + */ + boost::optional getLastKnownCommittedOpTime() const { + return _lastKnownCommittedOpTime; + } + + /** + * Sets the commit point at the time the last batch was returned. + */ + void setLastKnownCommittedOpTime(boost::optional lastCommittedOpTime) { + _lastKnownCommittedOpTime = std::move(lastCommittedOpTime); + } + /** * Returns the server-wide the count of living cursors. Such a cursor is called an "open * cursor". @@ -443,6 +453,10 @@ private: // A string with the plan summary of the cursor's query. std::string _planSummary; + + // Commit point at the time the last batch was returned. This is only used by internal exhaust + // oplog fetching. Also see lastKnownCommittedOpTime in GetMoreRequest. + boost::optional _lastKnownCommittedOpTime; }; /** diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 1ef70f0d2d0..1374cb1715d 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -563,9 +563,15 @@ public: // Mark this as an AwaitData operation if appropriate. if (cursorPin->isAwaitData() && !disableAwaitDataFailpointActive) { - if (_request.lastKnownCommittedOpTime) - clientsLastKnownCommittedOpTime(opCtx) = - _request.lastKnownCommittedOpTime.get(); + auto lastKnownCommittedOpTime = _request.lastKnownCommittedOpTime; + if (opCtx->isExhaust() && cursorPin->getLastKnownCommittedOpTime()) { + // Use the commit point of the last batch for exhaust cursors. + lastKnownCommittedOpTime = cursorPin->getLastKnownCommittedOpTime(); + } + if (lastKnownCommittedOpTime) { + clientsLastKnownCommittedOpTime(opCtx) = lastKnownCommittedOpTime.get(); + } + awaitDataState(opCtx).shouldWaitForInserts = true; } @@ -622,6 +628,12 @@ public: cursorPin->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); cursorPin->incNReturnedSoFar(numResults); cursorPin->incNBatches(); + + if (opCtx->isExhaust() && !clientsLastKnownCommittedOpTime(opCtx).isNull()) { + // Set the commit point of the latest batch. + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + cursorPin->setLastKnownCommittedOpTime(replCoord->getLastCommittedOpTime()); + } } else { curOp->debug().cursorExhausted = true; } diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp index 45a0a0b107f..e8fff744558 100644 --- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp +++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/client/dbclient_connection.h" +#include "mongo/client/dbclient_rs.h" #include "mongo/db/query/cursor_response.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/future.h" @@ -115,15 +116,28 @@ bool confirmCurrentOpContents(DBClientBase* conn, return false; } +repl::OpTime getLastAppliedOpTime(DBClientBase* conn) { + auto reply = + conn->runCommand(OpMsgRequest::fromDBAndBody("admin", BSON("replSetGetStatus" << 1))); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + auto lastAppliedOpTime = reply->getCommandReply()["optimes"]["appliedOpTime"]; + return repl::OpTime(lastAppliedOpTime["ts"].timestamp(), lastAppliedOpTime["t"].numberLong()); +} + // Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in // subsequent 'getMore's. -auto startExhaustQuery(DBClientBase* queryConnection, - std::unique_ptr& queryCursor) { +auto startExhaustQuery( + DBClientBase* queryConnection, + std::unique_ptr& queryCursor, + int queryOptions = 0, + Milliseconds awaitDataTimeoutMS = Milliseconds(5000), + const boost::optional& lastKnownCommittedOpTime = boost::none) { + queryOptions = queryOptions | QueryOption_Exhaust; auto queryThread = stdx::async(stdx::launch::async, [&] { const auto projSpec = BSON("_id" << 0 << "a" << 1); // Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then iterate // through the first batch and confirm that the results are as expected. - queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, QueryOption_Exhaust, 2); + queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, queryOptions, 2); for (int i = 0; i < 2; ++i) { ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << i)); } @@ -133,6 +147,13 @@ auto startExhaustQuery(DBClientBase* queryConnection, // until the cursor is exhausted, without the client sending any further getMore requests. // We expect this request to hang at the 'waitWithPinnedCursorDuringGetMoreBatch' failpoint. queryCursor->setBatchSize(1); + if ((queryOptions & QueryOption_CursorTailable) && (queryOptions & QueryOption_AwaitData)) { + queryCursor->setAwaitDataTimeoutMS(awaitDataTimeoutMS); + if (lastKnownCommittedOpTime) { + auto term = lastKnownCommittedOpTime.get().getTerm(); + queryCursor->setCurrentTermAndLastCommittedOpTime(term, lastKnownCommittedOpTime); + } + } ASSERT(queryCursor->more()); }); @@ -301,4 +322,75 @@ TEST(CurrentOpExhaustCursorTest, CleanupExhaustCursorOnBrokenPipe) { // Test that exhaust cursor is cleaned up on broken pipe even if the exhaust getMore succeeded. testClientDisconnect(true /* disconnectAfterGetMoreBatch */); } + +TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) { + auto fixtureConn = connect(); + + // We need to test the lastKnownCommittedOpTime in exhaust getMore requests. So we need a + // replica set. + if (!fixtureConn->isReplicaSetMember()) { + return; + } + + // Connect directly to the primary. + DBClientBase* conn = &static_cast(fixtureConn.get())->masterConn(); + ASSERT(conn); + + conn->dropCollection(testNSS.ns()); + // Create a capped collection to run tailable awaitData queries on. + conn->createCollection(testNSS.ns(), + 1024 /* size of collection */, + true /* capped */, + 10 /* max number of objects */); + // Insert initial records into the capped collection. + for (int i = 0; i < 5; i++) { + auto insertCmd = + BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i))); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + } + + // Get the lastAppliedOpTime after the initial inserts. + auto lastAppliedOpTime = getLastAppliedOpTime(conn); + + // Create a new connection to the primary for the exhaust query. + const auto fixtureQueryConn = connect(testBackgroundAppName); + DBClientBase* queryConn = + &static_cast(fixtureQueryConn.get())->masterConn(); + std::unique_ptr queryCursor; + + // Initiate a tailable awaitData exhaust cursor with lastKnownCommittedOpTime being the + // lastAppliedOpTime. + auto queryThread = startExhaustQuery(queryConn, + queryCursor, + QueryOption_CursorTailable | QueryOption_AwaitData, + Milliseconds(1000), // awaitData timeout + lastAppliedOpTime); // lastKnownCommittedOpTime + ON_BLOCK_EXIT([&conn, &queryThread] { queryThread.wait(); }); + + // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the + // lastAppliedOpTime. + auto curOpMatch = BSON("command.collection" + << testNSS.coll() << "command.getMore" << queryCursor->getCursorId() + << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime); + ASSERT(confirmCurrentOpContents(conn, curOpMatch)); + + // Inserting more records to unblock awaitData and advance the commit point. + for (int i = 5; i < 8; i++) { + auto insertCmd = + BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i))); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + } + + // Get the new lastAppliedOpTime after the inserts. + lastAppliedOpTime = getLastAppliedOpTime(conn); + + // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the + // new lastAppliedOpTime. + curOpMatch = BSON("command.collection" + << testNSS.coll() << "command.getMore" << queryCursor->getCursorId() + << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime); + ASSERT(confirmCurrentOpContents(conn, curOpMatch)); +} } // namespace mongo diff --git a/src/mongo/db/generic_cursor.idl b/src/mongo/db/generic_cursor.idl index edb3929f6d5..0e7b796bd1a 100644 --- a/src/mongo/db/generic_cursor.idl +++ b/src/mongo/db/generic_cursor.idl @@ -35,6 +35,7 @@ global: imports: - "mongo/db/logical_session_id.idl" + - "mongo/db/repl/replication_types.idl" - "mongo/idl/basic_types.idl" structs: @@ -92,3 +93,8 @@ structs: description: The op ID of the operation pinning the cursor. Will be empty for idle cursors. type: long optional: true + lastKnownCommittedOpTime: + description: "The commit point known by the server at the time when the last batch was + returned." + type: optime + optional: true diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index cc2df087217..c4e1bb83c7d 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -627,7 +627,14 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted* ob } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. } else if (PlanStage::IS_EOF == code) { - if (MONGO_unlikely(planExecutorHangBeforeShouldWaitForInserts.shouldFail())) { + if (MONGO_unlikely(planExecutorHangBeforeShouldWaitForInserts.shouldFail( + [this](const BSONObj& data) { + if (data.hasField("namespace") && + _nss != NamespaceString(data.getStringField("namespace"))) { + return false; + } + return true; + }))) { log() << "PlanExecutor - planExecutorHangBeforeShouldWaitForInserts fail point " "enabled. Blocking until fail point is disabled."; planExecutorHangBeforeShouldWaitForInserts.pauseWhileSet(); -- cgit v1.2.1