summaryrefslogtreecommitdiff
path: root/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-01-06 15:42:38 +0000
committerevergreen <evergreen@mongodb.com>2020-01-06 15:42:38 +0000
commitd628d67126fb51cebf8960f4d73816f94f2cb99d (patch)
treee0386b8034ff732779969e49f875fda2cc815189 /src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
parenta047dc462b3e9f828ad43ea1ff5c920f633b17e9 (diff)
downloadmongo-d628d67126fb51cebf8960f4d73816f94f2cb99d.tar.gz
SERVER-44707: Store lastKnownCommittedOpTime in ClientCursor for exhaust getMore
Diffstat (limited to 'src/mongo/db/exhaust_cursor_currentop_integration_test.cpp')
-rw-r--r--src/mongo/db/exhaust_cursor_currentop_integration_test.cpp98
1 files changed, 95 insertions, 3 deletions
diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
index 45a0a0b107f..e8fff744558 100644
--- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
+++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/client/dbclient_connection.h"
+#include "mongo/client/dbclient_rs.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/future.h"
@@ -115,15 +116,28 @@ bool confirmCurrentOpContents(DBClientBase* conn,
return false;
}
+repl::OpTime getLastAppliedOpTime(DBClientBase* conn) {
+ auto reply =
+ conn->runCommand(OpMsgRequest::fromDBAndBody("admin", BSON("replSetGetStatus" << 1)));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ auto lastAppliedOpTime = reply->getCommandReply()["optimes"]["appliedOpTime"];
+ return repl::OpTime(lastAppliedOpTime["ts"].timestamp(), lastAppliedOpTime["t"].numberLong());
+}
+
// Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in
// subsequent 'getMore's.
-auto startExhaustQuery(DBClientBase* queryConnection,
- std::unique_ptr<DBClientCursor>& queryCursor) {
+auto startExhaustQuery(
+ DBClientBase* queryConnection,
+ std::unique_ptr<DBClientCursor>& queryCursor,
+ int queryOptions = 0,
+ Milliseconds awaitDataTimeoutMS = Milliseconds(5000),
+ const boost::optional<repl::OpTime>& lastKnownCommittedOpTime = boost::none) {
+ queryOptions = queryOptions | QueryOption_Exhaust;
auto queryThread = stdx::async(stdx::launch::async, [&] {
const auto projSpec = BSON("_id" << 0 << "a" << 1);
// Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then iterate
// through the first batch and confirm that the results are as expected.
- queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, QueryOption_Exhaust, 2);
+ queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, queryOptions, 2);
for (int i = 0; i < 2; ++i) {
ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << i));
}
@@ -133,6 +147,13 @@ auto startExhaustQuery(DBClientBase* queryConnection,
// until the cursor is exhausted, without the client sending any further getMore requests.
// We expect this request to hang at the 'waitWithPinnedCursorDuringGetMoreBatch' failpoint.
queryCursor->setBatchSize(1);
+ if ((queryOptions & QueryOption_CursorTailable) && (queryOptions & QueryOption_AwaitData)) {
+ queryCursor->setAwaitDataTimeoutMS(awaitDataTimeoutMS);
+ if (lastKnownCommittedOpTime) {
+ auto term = lastKnownCommittedOpTime.get().getTerm();
+ queryCursor->setCurrentTermAndLastCommittedOpTime(term, lastKnownCommittedOpTime);
+ }
+ }
ASSERT(queryCursor->more());
});
@@ -301,4 +322,75 @@ TEST(CurrentOpExhaustCursorTest, CleanupExhaustCursorOnBrokenPipe) {
// Test that exhaust cursor is cleaned up on broken pipe even if the exhaust getMore succeeded.
testClientDisconnect(true /* disconnectAfterGetMoreBatch */);
}
+
+TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) {
+ auto fixtureConn = connect();
+
+ // We need to test the lastKnownCommittedOpTime in exhaust getMore requests. So we need a
+ // replica set.
+ if (!fixtureConn->isReplicaSetMember()) {
+ return;
+ }
+
+ // Connect directly to the primary.
+ DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+
+ conn->dropCollection(testNSS.ns());
+ // Create a capped collection to run tailable awaitData queries on.
+ conn->createCollection(testNSS.ns(),
+ 1024 /* size of collection */,
+ true /* capped */,
+ 10 /* max number of objects */);
+ // Insert initial records into the capped collection.
+ for (int i = 0; i < 5; i++) {
+ auto insertCmd =
+ BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i)));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ }
+
+ // Get the lastAppliedOpTime after the initial inserts.
+ auto lastAppliedOpTime = getLastAppliedOpTime(conn);
+
+ // Create a new connection to the primary for the exhaust query.
+ const auto fixtureQueryConn = connect(testBackgroundAppName);
+ DBClientBase* queryConn =
+ &static_cast<DBClientReplicaSet*>(fixtureQueryConn.get())->masterConn();
+ std::unique_ptr<DBClientCursor> queryCursor;
+
+ // Initiate a tailable awaitData exhaust cursor with lastKnownCommittedOpTime being the
+ // lastAppliedOpTime.
+ auto queryThread = startExhaustQuery(queryConn,
+ queryCursor,
+ QueryOption_CursorTailable | QueryOption_AwaitData,
+ Milliseconds(1000), // awaitData timeout
+ lastAppliedOpTime); // lastKnownCommittedOpTime
+ ON_BLOCK_EXIT([&conn, &queryThread] { queryThread.wait(); });
+
+ // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the
+ // lastAppliedOpTime.
+ auto curOpMatch = BSON("command.collection"
+ << testNSS.coll() << "command.getMore" << queryCursor->getCursorId()
+ << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime);
+ ASSERT(confirmCurrentOpContents(conn, curOpMatch));
+
+ // Inserting more records to unblock awaitData and advance the commit point.
+ for (int i = 5; i < 8; i++) {
+ auto insertCmd =
+ BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i)));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ }
+
+ // Get the new lastAppliedOpTime after the inserts.
+ lastAppliedOpTime = getLastAppliedOpTime(conn);
+
+ // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the
+ // new lastAppliedOpTime.
+ curOpMatch = BSON("command.collection"
+ << testNSS.coll() << "command.getMore" << queryCursor->getCursorId()
+ << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime);
+ ASSERT(confirmCurrentOpContents(conn, curOpMatch));
+}
} // namespace mongo