diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2020-01-06 15:42:38 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-01-06 15:42:38 +0000 |
commit | d628d67126fb51cebf8960f4d73816f94f2cb99d (patch) | |
tree | e0386b8034ff732779969e49f875fda2cc815189 /src/mongo/db/exhaust_cursor_currentop_integration_test.cpp | |
parent | a047dc462b3e9f828ad43ea1ff5c920f633b17e9 (diff) | |
download | mongo-d628d67126fb51cebf8960f4d73816f94f2cb99d.tar.gz |
SERVER-44707: Store lastKnownCommittedOpTime in ClientCursor for exhaust getMore
Diffstat (limited to 'src/mongo/db/exhaust_cursor_currentop_integration_test.cpp')
-rw-r--r-- | src/mongo/db/exhaust_cursor_currentop_integration_test.cpp | 98 |
1 files changed, 95 insertions, 3 deletions
diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp index 45a0a0b107f..e8fff744558 100644 --- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp +++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/client/dbclient_connection.h" +#include "mongo/client/dbclient_rs.h" #include "mongo/db/query/cursor_response.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/future.h" @@ -115,15 +116,28 @@ bool confirmCurrentOpContents(DBClientBase* conn, return false; } +repl::OpTime getLastAppliedOpTime(DBClientBase* conn) { + auto reply = + conn->runCommand(OpMsgRequest::fromDBAndBody("admin", BSON("replSetGetStatus" << 1))); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + auto lastAppliedOpTime = reply->getCommandReply()["optimes"]["appliedOpTime"]; + return repl::OpTime(lastAppliedOpTime["ts"].timestamp(), lastAppliedOpTime["t"].numberLong()); +} + // Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in // subsequent 'getMore's. -auto startExhaustQuery(DBClientBase* queryConnection, - std::unique_ptr<DBClientCursor>& queryCursor) { +auto startExhaustQuery( + DBClientBase* queryConnection, + std::unique_ptr<DBClientCursor>& queryCursor, + int queryOptions = 0, + Milliseconds awaitDataTimeoutMS = Milliseconds(5000), + const boost::optional<repl::OpTime>& lastKnownCommittedOpTime = boost::none) { + queryOptions = queryOptions | QueryOption_Exhaust; auto queryThread = stdx::async(stdx::launch::async, [&] { const auto projSpec = BSON("_id" << 0 << "a" << 1); // Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then iterate // through the first batch and confirm that the results are as expected. - queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, QueryOption_Exhaust, 2); + queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, queryOptions, 2); for (int i = 0; i < 2; ++i) { ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << i)); } @@ -133,6 +147,13 @@ auto startExhaustQuery(DBClientBase* queryConnection, // until the cursor is exhausted, without the client sending any further getMore requests. // We expect this request to hang at the 'waitWithPinnedCursorDuringGetMoreBatch' failpoint. queryCursor->setBatchSize(1); + if ((queryOptions & QueryOption_CursorTailable) && (queryOptions & QueryOption_AwaitData)) { + queryCursor->setAwaitDataTimeoutMS(awaitDataTimeoutMS); + if (lastKnownCommittedOpTime) { + auto term = lastKnownCommittedOpTime.get().getTerm(); + queryCursor->setCurrentTermAndLastCommittedOpTime(term, lastKnownCommittedOpTime); + } + } ASSERT(queryCursor->more()); }); @@ -301,4 +322,75 @@ TEST(CurrentOpExhaustCursorTest, CleanupExhaustCursorOnBrokenPipe) { // Test that exhaust cursor is cleaned up on broken pipe even if the exhaust getMore succeeded. testClientDisconnect(true /* disconnectAfterGetMoreBatch */); } + +TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) { + auto fixtureConn = connect(); + + // We need to test the lastKnownCommittedOpTime in exhaust getMore requests. So we need a + // replica set. + if (!fixtureConn->isReplicaSetMember()) { + return; + } + + // Connect directly to the primary. + DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + + conn->dropCollection(testNSS.ns()); + // Create a capped collection to run tailable awaitData queries on. + conn->createCollection(testNSS.ns(), + 1024 /* size of collection */, + true /* capped */, + 10 /* max number of objects */); + // Insert initial records into the capped collection. + for (int i = 0; i < 5; i++) { + auto insertCmd = + BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i))); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + } + + // Get the lastAppliedOpTime after the initial inserts. + auto lastAppliedOpTime = getLastAppliedOpTime(conn); + + // Create a new connection to the primary for the exhaust query. + const auto fixtureQueryConn = connect(testBackgroundAppName); + DBClientBase* queryConn = + &static_cast<DBClientReplicaSet*>(fixtureQueryConn.get())->masterConn(); + std::unique_ptr<DBClientCursor> queryCursor; + + // Initiate a tailable awaitData exhaust cursor with lastKnownCommittedOpTime being the + // lastAppliedOpTime. + auto queryThread = startExhaustQuery(queryConn, + queryCursor, + QueryOption_CursorTailable | QueryOption_AwaitData, + Milliseconds(1000), // awaitData timeout + lastAppliedOpTime); // lastKnownCommittedOpTime + ON_BLOCK_EXIT([&conn, &queryThread] { queryThread.wait(); }); + + // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the + // lastAppliedOpTime. + auto curOpMatch = BSON("command.collection" + << testNSS.coll() << "command.getMore" << queryCursor->getCursorId() + << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime); + ASSERT(confirmCurrentOpContents(conn, curOpMatch)); + + // Inserting more records to unblock awaitData and advance the commit point. + for (int i = 5; i < 8; i++) { + auto insertCmd = + BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i))); + auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd)); + ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply())); + } + + // Get the new lastAppliedOpTime after the inserts. + lastAppliedOpTime = getLastAppliedOpTime(conn); + + // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the + // new lastAppliedOpTime. + curOpMatch = BSON("command.collection" + << testNSS.coll() << "command.getMore" << queryCursor->getCursorId() + << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime); + ASSERT(confirmCurrentOpContents(conn, curOpMatch)); +} } // namespace mongo |