summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-01-06 15:42:38 +0000
committerevergreen <evergreen@mongodb.com>2020-01-06 15:42:38 +0000
commitd628d67126fb51cebf8960f4d73816f94f2cb99d (patch)
treee0386b8034ff732779969e49f875fda2cc815189 /src/mongo/db
parenta047dc462b3e9f828ad43ea1ff5c920f633b17e9 (diff)
downloadmongo-d628d67126fb51cebf8960f4d73816f94f2cb99d.tar.gz
SERVER-44707: Store lastKnownCommittedOpTime in ClientCursor for exhaust getMore
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/clientcursor.cpp1
-rw-r--r--src/mongo/db/clientcursor.h22
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp18
-rw-r--r--src/mongo/db/exhaust_cursor_currentop_integration_test.cpp98
-rw-r--r--src/mongo/db/generic_cursor.idl6
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp9
7 files changed, 144 insertions, 11 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 18f5e8e6795..18fbf1e190b 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1664,6 +1664,7 @@ env.Library(
],
LIBDEPS=[
'service_context',
+ '$BUILD_DIR/mongo/db/repl/optime',
'$BUILD_DIR/mongo/idl/idl_parser',
],
)
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 2e87deb2fd0..7bee0fb09f9 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -152,6 +152,7 @@ GenericCursor ClientCursor::toGenericCursor() const {
if (auto opCtx = _operationUsingCursor) {
gc.setOperationUsingCursorId(opCtx->getOpID());
}
+ gc.setLastKnownCommittedOpTime(_lastKnownCommittedOpTime);
return gc;
}
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 2f5663012d8..7496c9e08b7 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -282,10 +282,6 @@ public:
*/
GenericCursor toGenericCursor() const;
- //
- // Timing.
- //
-
/**
* Returns the amount of time execution time available to this cursor. Only valid at the
* beginning of a getMore request, and only really for use by the maxTime tracking code.
@@ -307,6 +303,20 @@ public:
}
/**
+ * Returns the commit point at the time the last batch was returned.
+ */
+ boost::optional<repl::OpTime> getLastKnownCommittedOpTime() const {
+ return _lastKnownCommittedOpTime;
+ }
+
+ /**
+ * Sets the commit point at the time the last batch was returned.
+ */
+ void setLastKnownCommittedOpTime(boost::optional<repl::OpTime> lastCommittedOpTime) {
+ _lastKnownCommittedOpTime = std::move(lastCommittedOpTime);
+ }
+
+ /**
* Returns the server-wide the count of living cursors. Such a cursor is called an "open
* cursor".
*/
@@ -443,6 +453,10 @@ private:
// A string with the plan summary of the cursor's query.
std::string _planSummary;
+
+ // Commit point at the time the last batch was returned. This is only used by internal exhaust
+ // oplog fetching. Also see lastKnownCommittedOpTime in GetMoreRequest.
+ boost::optional<repl::OpTime> _lastKnownCommittedOpTime;
};
/**
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 1ef70f0d2d0..1374cb1715d 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -563,9 +563,15 @@ public:
// Mark this as an AwaitData operation if appropriate.
if (cursorPin->isAwaitData() && !disableAwaitDataFailpointActive) {
- if (_request.lastKnownCommittedOpTime)
- clientsLastKnownCommittedOpTime(opCtx) =
- _request.lastKnownCommittedOpTime.get();
+ auto lastKnownCommittedOpTime = _request.lastKnownCommittedOpTime;
+ if (opCtx->isExhaust() && cursorPin->getLastKnownCommittedOpTime()) {
+ // Use the commit point of the last batch for exhaust cursors.
+ lastKnownCommittedOpTime = cursorPin->getLastKnownCommittedOpTime();
+ }
+ if (lastKnownCommittedOpTime) {
+ clientsLastKnownCommittedOpTime(opCtx) = lastKnownCommittedOpTime.get();
+ }
+
awaitDataState(opCtx).shouldWaitForInserts = true;
}
@@ -622,6 +628,12 @@ public:
cursorPin->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
cursorPin->incNReturnedSoFar(numResults);
cursorPin->incNBatches();
+
+ if (opCtx->isExhaust() && !clientsLastKnownCommittedOpTime(opCtx).isNull()) {
+ // Set the commit point of the latest batch.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ cursorPin->setLastKnownCommittedOpTime(replCoord->getLastCommittedOpTime());
+ }
} else {
curOp->debug().cursorExhausted = true;
}
diff --git a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
index 45a0a0b107f..e8fff744558 100644
--- a/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
+++ b/src/mongo/db/exhaust_cursor_currentop_integration_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/client/dbclient_connection.h"
+#include "mongo/client/dbclient_rs.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/future.h"
@@ -115,15 +116,28 @@ bool confirmCurrentOpContents(DBClientBase* conn,
return false;
}
+repl::OpTime getLastAppliedOpTime(DBClientBase* conn) {
+ auto reply =
+ conn->runCommand(OpMsgRequest::fromDBAndBody("admin", BSON("replSetGetStatus" << 1)));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ auto lastAppliedOpTime = reply->getCommandReply()["optimes"]["appliedOpTime"];
+ return repl::OpTime(lastAppliedOpTime["ts"].timestamp(), lastAppliedOpTime["t"].numberLong());
+}
+
// Start an exhaust request with a batchSize of 2 in the initial 'find' and a batchSize of 1 in
// subsequent 'getMore's.
-auto startExhaustQuery(DBClientBase* queryConnection,
- std::unique_ptr<DBClientCursor>& queryCursor) {
+auto startExhaustQuery(
+ DBClientBase* queryConnection,
+ std::unique_ptr<DBClientCursor>& queryCursor,
+ int queryOptions = 0,
+ Milliseconds awaitDataTimeoutMS = Milliseconds(5000),
+ const boost::optional<repl::OpTime>& lastKnownCommittedOpTime = boost::none) {
+ queryOptions = queryOptions | QueryOption_Exhaust;
auto queryThread = stdx::async(stdx::launch::async, [&] {
const auto projSpec = BSON("_id" << 0 << "a" << 1);
// Issue the initial 'find' with a batchSize of 2 and the exhaust flag set. We then iterate
// through the first batch and confirm that the results are as expected.
- queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, QueryOption_Exhaust, 2);
+ queryCursor = queryConnection->query(testNSS, {}, 0, 0, &projSpec, queryOptions, 2);
for (int i = 0; i < 2; ++i) {
ASSERT_BSONOBJ_EQ(queryCursor->nextSafe(), BSON("a" << i));
}
@@ -133,6 +147,13 @@ auto startExhaustQuery(DBClientBase* queryConnection,
// until the cursor is exhausted, without the client sending any further getMore requests.
// We expect this request to hang at the 'waitWithPinnedCursorDuringGetMoreBatch' failpoint.
queryCursor->setBatchSize(1);
+ if ((queryOptions & QueryOption_CursorTailable) && (queryOptions & QueryOption_AwaitData)) {
+ queryCursor->setAwaitDataTimeoutMS(awaitDataTimeoutMS);
+ if (lastKnownCommittedOpTime) {
+ auto term = lastKnownCommittedOpTime.get().getTerm();
+ queryCursor->setCurrentTermAndLastCommittedOpTime(term, lastKnownCommittedOpTime);
+ }
+ }
ASSERT(queryCursor->more());
});
@@ -301,4 +322,75 @@ TEST(CurrentOpExhaustCursorTest, CleanupExhaustCursorOnBrokenPipe) {
// Test that exhaust cursor is cleaned up on broken pipe even if the exhaust getMore succeeded.
testClientDisconnect(true /* disconnectAfterGetMoreBatch */);
}
+
+TEST(CurrentOpExhaustCursorTest, ExhaustCursorUpdatesLastKnownCommittedOpTime) {
+ auto fixtureConn = connect();
+
+ // We need to test the lastKnownCommittedOpTime in exhaust getMore requests. So we need a
+ // replica set.
+ if (!fixtureConn->isReplicaSetMember()) {
+ return;
+ }
+
+ // Connect directly to the primary.
+ DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+
+ conn->dropCollection(testNSS.ns());
+ // Create a capped collection to run tailable awaitData queries on.
+ conn->createCollection(testNSS.ns(),
+ 1024 /* size of collection */,
+ true /* capped */,
+ 10 /* max number of objects */);
+ // Insert initial records into the capped collection.
+ for (int i = 0; i < 5; i++) {
+ auto insertCmd =
+ BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i)));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ }
+
+ // Get the lastAppliedOpTime after the initial inserts.
+ auto lastAppliedOpTime = getLastAppliedOpTime(conn);
+
+ // Create a new connection to the primary for the exhaust query.
+ const auto fixtureQueryConn = connect(testBackgroundAppName);
+ DBClientBase* queryConn =
+ &static_cast<DBClientReplicaSet*>(fixtureQueryConn.get())->masterConn();
+ std::unique_ptr<DBClientCursor> queryCursor;
+
+ // Initiate a tailable awaitData exhaust cursor with lastKnownCommittedOpTime being the
+ // lastAppliedOpTime.
+ auto queryThread = startExhaustQuery(queryConn,
+ queryCursor,
+ QueryOption_CursorTailable | QueryOption_AwaitData,
+ Milliseconds(1000), // awaitData timeout
+ lastAppliedOpTime); // lastKnownCommittedOpTime
+ ON_BLOCK_EXIT([&conn, &queryThread] { queryThread.wait(); });
+
+ // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the
+ // lastAppliedOpTime.
+ auto curOpMatch = BSON("command.collection"
+ << testNSS.coll() << "command.getMore" << queryCursor->getCursorId()
+ << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime);
+ ASSERT(confirmCurrentOpContents(conn, curOpMatch));
+
+ // Inserting more records to unblock awaitData and advance the commit point.
+ for (int i = 5; i < 8; i++) {
+ auto insertCmd =
+ BSON("insert" << testNSS.coll() << "documents" << BSON_ARRAY(BSON("a" << i)));
+ auto reply = conn->runCommand(OpMsgRequest::fromDBAndBody(testNSS.db(), insertCmd));
+ ASSERT_OK(getStatusFromCommandResult(reply->getCommandReply()));
+ }
+
+ // Get the new lastAppliedOpTime after the inserts.
+ lastAppliedOpTime = getLastAppliedOpTime(conn);
+
+ // Test that the cursor's lastKnownCommittedOpTime is eventually advanced to the
+ // new lastAppliedOpTime.
+ curOpMatch = BSON("command.collection"
+ << testNSS.coll() << "command.getMore" << queryCursor->getCursorId()
+ << "cursor.lastKnownCommittedOpTime" << lastAppliedOpTime);
+ ASSERT(confirmCurrentOpContents(conn, curOpMatch));
+}
} // namespace mongo
diff --git a/src/mongo/db/generic_cursor.idl b/src/mongo/db/generic_cursor.idl
index edb3929f6d5..0e7b796bd1a 100644
--- a/src/mongo/db/generic_cursor.idl
+++ b/src/mongo/db/generic_cursor.idl
@@ -35,6 +35,7 @@ global:
imports:
- "mongo/db/logical_session_id.idl"
+ - "mongo/db/repl/replication_types.idl"
- "mongo/idl/basic_types.idl"
structs:
@@ -92,3 +93,8 @@ structs:
description: The op ID of the operation pinning the cursor. Will be empty for idle cursors.
type: long
optional: true
+ lastKnownCommittedOpTime:
+ description: "The commit point known by the server at the time when the last batch was
+ returned."
+ type: optime
+ optional: true
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index cc2df087217..c4e1bb83c7d 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -627,7 +627,14 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob
} else if (PlanStage::NEED_TIME == code) {
// Fall through to yield check at end of large conditional.
} else if (PlanStage::IS_EOF == code) {
- if (MONGO_unlikely(planExecutorHangBeforeShouldWaitForInserts.shouldFail())) {
+ if (MONGO_unlikely(planExecutorHangBeforeShouldWaitForInserts.shouldFail(
+ [this](const BSONObj& data) {
+ if (data.hasField("namespace") &&
+ _nss != NamespaceString(data.getStringField("namespace"))) {
+ return false;
+ }
+ return true;
+ }))) {
log() << "PlanExecutor - planExecutorHangBeforeShouldWaitForInserts fail point "
"enabled. Blocking until fail point is disabled.";
planExecutorHangBeforeShouldWaitForInserts.pauseWhileSet();