summaryrefslogtreecommitdiff
path: root/src/mongo/client
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2019-12-19 21:38:39 +0000
committerevergreen <evergreen@mongodb.com>2019-12-19 21:38:39 +0000
commitd1ba3bc8890f54e5dad91da85ce56626859c166c (patch)
tree4dac59147e0774516f743715424ecf6765609f39 /src/mongo/client
parentb4db881a18cbe15127a5a60c971cd393e0621466 (diff)
downloadmongo-d1ba3bc8890f54e5dad91da85ce56626859c166c.tar.gz
SERVER-45232: Support oplog query in DBClientCursor
Diffstat (limited to 'src/mongo/client')
-rw-r--r--src/mongo/client/dbclient_cursor.cpp12
-rw-r--r--src/mongo/client/dbclient_cursor.h12
-rw-r--r--src/mongo/client/dbclient_cursor_test.cpp84
3 files changed, 106 insertions, 2 deletions
diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp
index df26ca3e9c6..fe677d9f00c 100644
--- a/src/mongo/client/dbclient_cursor.cpp
+++ b/src/mongo/client/dbclient_cursor.cpp
@@ -131,6 +131,14 @@ Message DBClientCursor::_assembleInit() {
// Legacy queries don't handle readOnce.
qr.getValue()->setReadOnce(true);
}
+ if (auto replTerm = query[QueryRequest::kTermField]) {
+ // Legacy queries don't handle term.
+ qr.getValue()->setReplicationTerm(replTerm.numberLong());
+ }
+ if (auto readConcern = query[repl::ReadConcernArgs::kReadConcernFieldName]) {
+ // Legacy queries don't handle readConcern.
+ qr.getValue()->setReadConcern(readConcern.Obj());
+ }
BSONObj cmd = _nsOrUuid.uuid() ? qr.getValue()->asFindCommandWithUuid()
: qr.getValue()->asFindCommand();
if (auto readPref = query["$readPreference"]) {
@@ -164,8 +172,8 @@ Message DBClientCursor::_assembleGetMore() {
boost::make_optional(batchSize != 0, batchSize),
boost::make_optional(tailableAwaitData(),
_awaitDataTimeout), // awaitDataTimeout
- boost::none, // term
- boost::none); // lastKnownCommittedOptime
+ _term,
+ _lastKnownCommittedOpTime);
auto msg = assembleCommandRequest(_client, ns.db(), opts, gmr.toBSON());
// Set the exhaust flag if needed.
if (opts & QueryOption_Exhaust && msg.operation() == dbMsg) {
diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h
index 4d7dd74dd1b..e8c7d91f8b8 100644
--- a/src/mongo/client/dbclient_cursor.h
+++ b/src/mongo/client/dbclient_cursor.h
@@ -235,6 +235,16 @@ public:
_awaitDataTimeout = timeout;
}
+ // Only used for tailable awaitData oplog fetching requests.
+ void setCurrentTermAndLastCommittedOpTime(
+ const boost::optional<long long>& term,
+ const boost::optional<repl::OpTime>& lastCommittedOpTime) {
+ invariant(tailableAwaitData());
+ invariant(ns == NamespaceString::kRsOplogNamespace);
+ _term = term;
+ _lastKnownCommittedOpTime = lastCommittedOpTime;
+ }
+
protected:
struct Batch {
// TODO remove constructors after c++17 toolchain upgrade
@@ -288,6 +298,8 @@ private:
bool _connectionHasPendingReplies = false;
int _lastRequestId = 0;
Milliseconds _awaitDataTimeout = Milliseconds{0};
+ boost::optional<long long> _term;
+ boost::optional<repl::OpTime> _lastKnownCommittedOpTime;
void dataReceived(const Message& reply) {
bool retry;
diff --git a/src/mongo/client/dbclient_cursor_test.cpp b/src/mongo/client/dbclient_cursor_test.cpp
index d9f42d1ae7f..9df9dd01405 100644
--- a/src/mongo/client/dbclient_cursor_test.cpp
+++ b/src/mongo/client/dbclient_cursor_test.cpp
@@ -754,6 +754,90 @@ TEST_F(DBClientCursorTest, DBClientCursorTailableAwaitDataExhaust) {
ASSERT_TRUE(conn.getLastSentMessage().empty());
}
+TEST_F(DBClientCursorTest, DBClientCursorOplogQuery) {
+ // This tests DBClientCursor supports oplog query with special fields in the command request.
+ // 1. Initial find command has "filter", "tailable", "awaitData", "oplogReplay", "maxTimeMS",
+ // "batchSize", "term" and "readConcern" fields set.
+ // 2. A subsequent getMore command sets awaitData timeout and lastKnownCommittedOpTime
+ // correctly.
+
+ // Set up the DBClientCursor and a mock client connection.
+ DBClientConnectionForTest conn;
+ const NamespaceString nss = NamespaceString::kRsOplogNamespace;
+ const BSONObj filterObj = BSON("ts" << BSON("$gte" << Timestamp(123, 4)));
+ const BSONObj readConcernObj = BSON("afterClusterTime" << Timestamp(0, 1));
+ const long long maxTimeMS = 5000LL;
+ const long long term = 5;
+ const auto oplogQuery = QUERY("query" << filterObj << "readConcern" << readConcernObj
+ << "$maxTimeMS" << maxTimeMS << "term" << term);
+
+ DBClientCursor cursor(&conn,
+ NamespaceStringOrUUID(nss),
+ oplogQuery.obj,
+ 0,
+ 0,
+ nullptr,
+ QueryOption_CursorTailable | QueryOption_AwaitData |
+ QueryOption_OplogReplay,
+ 0);
+ cursor.setBatchSize(0);
+
+ // Set up mock 'find' response.
+ const long long cursorId = 42;
+ Message findResponseMsg = mockFindResponse(nss, cursorId, {});
+
+ conn.setCallResponse(findResponseMsg);
+ ASSERT(cursor.init());
+
+ // --- Test 1 ---
+ // Verify that the initial 'find' request was sent.
+ auto m = conn.getLastSentMessage();
+ ASSERT_FALSE(m.empty());
+ auto msg = OpMsg::parse(m);
+ ASSERT_EQ(OpMsg::flags(m), OpMsg::kChecksumPresent);
+ ASSERT_EQ(msg.body.getStringField("find"), nss.coll()) << msg.body;
+ ASSERT_BSONOBJ_EQ(msg.body["filter"].Obj(), filterObj);
+ ASSERT_TRUE(msg.body.getBoolField("tailable")) << msg.body;
+ ASSERT_TRUE(msg.body.getBoolField("awaitData")) << msg.body;
+ ASSERT_TRUE(msg.body.getBoolField("oplogReplay")) << msg.body;
+ ASSERT_EQ(msg.body["maxTimeMS"].numberLong(), maxTimeMS) << msg.body;
+ ASSERT_EQ(msg.body["batchSize"].number(), 0) << msg.body;
+ ASSERT_EQ(msg.body["term"].numberLong(), term) << msg.body;
+ ASSERT_BSONOBJ_EQ(msg.body["readConcern"].Obj(), readConcernObj);
+
+ cursor.setAwaitDataTimeoutMS(Milliseconds{5000});
+ ASSERT_EQ(cursor.getAwaitDataTimeoutMS(), Milliseconds{5000});
+
+ cursor.setCurrentTermAndLastCommittedOpTime(term, repl::OpTime(Timestamp(123, 4), term));
+
+ // --- Test 2 ---
+ // Create a 'getMore' response with two documents and set it as the mock response.
+ cursor.setBatchSize(2);
+ auto getMoreResponseMsg = mockGetMoreResponse(nss, cursorId, {docObj(1), docObj(2)});
+ conn.setCallResponse(getMoreResponseMsg);
+
+ // Request more results. This call should trigger the first 'getMore' request.
+ conn.clearLastSentMessage();
+ ASSERT_TRUE(cursor.more());
+ m = conn.getLastSentMessage();
+ ASSERT_FALSE(m.empty());
+ msg = OpMsg::parse(m);
+ ASSERT_EQ(StringData(msg.body.firstElement().fieldName()), "getMore") << msg.body;
+ ASSERT_EQ(msg.body["getMore"].type(), BSONType::NumberLong) << msg.body;
+ ASSERT_EQ(msg.body["getMore"].numberLong(), cursorId) << msg.body;
+ // Make sure the correct awaitData timeout is sent.
+ ASSERT_EQ(msg.body["maxTimeMS"].number(), 5000) << msg.body;
+ // Make sure the correct term is sent.
+ ASSERT_EQ(msg.body["term"].numberLong(), term) << msg.body;
+ // Make sure the correct lastKnownCommittedOpTime is sent.
+ ASSERT_EQ(msg.body["lastKnownCommittedOpTime"]["ts"].timestamp(), Timestamp(123, 4))
+ << msg.body;
+ ASSERT_EQ(msg.body["lastKnownCommittedOpTime"]["t"].numberLong(), term) << msg.body;
+ ASSERT_BSONOBJ_EQ(docObj(1), cursor.next());
+ ASSERT_BSONOBJ_EQ(docObj(2), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+ ASSERT_FALSE(cursor.isDead());
+}
} // namespace
} // namespace mongo