summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2019-12-19 21:38:39 +0000
committerevergreen <evergreen@mongodb.com>2019-12-19 21:38:39 +0000
commitd1ba3bc8890f54e5dad91da85ce56626859c166c (patch)
tree4dac59147e0774516f743715424ecf6765609f39 /src
parentb4db881a18cbe15127a5a60c971cd393e0621466 (diff)
downloadmongo-d1ba3bc8890f54e5dad91da85ce56626859c166c.tar.gz
SERVER-45232: Support oplog query in DBClientCursor
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/dbclient_cursor.cpp12
-rw-r--r--src/mongo/client/dbclient_cursor.h12
-rw-r--r--src/mongo/client/dbclient_cursor_test.cpp84
-rw-r--r--src/mongo/db/query/query_request.cpp62
-rw-r--r--src/mongo/db/query/query_request.h31
5 files changed, 168 insertions, 33 deletions
diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp
index df26ca3e9c6..fe677d9f00c 100644
--- a/src/mongo/client/dbclient_cursor.cpp
+++ b/src/mongo/client/dbclient_cursor.cpp
@@ -131,6 +131,14 @@ Message DBClientCursor::_assembleInit() {
// Legacy queries don't handle readOnce.
qr.getValue()->setReadOnce(true);
}
+ if (auto replTerm = query[QueryRequest::kTermField]) {
+ // Legacy queries don't handle term.
+ qr.getValue()->setReplicationTerm(replTerm.numberLong());
+ }
+ if (auto readConcern = query[repl::ReadConcernArgs::kReadConcernFieldName]) {
+ // Legacy queries don't handle readConcern.
+ qr.getValue()->setReadConcern(readConcern.Obj());
+ }
BSONObj cmd = _nsOrUuid.uuid() ? qr.getValue()->asFindCommandWithUuid()
: qr.getValue()->asFindCommand();
if (auto readPref = query["$readPreference"]) {
@@ -164,8 +172,8 @@ Message DBClientCursor::_assembleGetMore() {
boost::make_optional(batchSize != 0, batchSize),
boost::make_optional(tailableAwaitData(),
_awaitDataTimeout), // awaitDataTimeout
- boost::none, // term
- boost::none); // lastKnownCommittedOptime
+ _term,
+ _lastKnownCommittedOpTime);
auto msg = assembleCommandRequest(_client, ns.db(), opts, gmr.toBSON());
// Set the exhaust flag if needed.
if (opts & QueryOption_Exhaust && msg.operation() == dbMsg) {
diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h
index 4d7dd74dd1b..e8c7d91f8b8 100644
--- a/src/mongo/client/dbclient_cursor.h
+++ b/src/mongo/client/dbclient_cursor.h
@@ -235,6 +235,16 @@ public:
_awaitDataTimeout = timeout;
}
+ // Only used for tailable awaitData oplog fetching requests.
+ void setCurrentTermAndLastCommittedOpTime(
+ const boost::optional<long long>& term,
+ const boost::optional<repl::OpTime>& lastCommittedOpTime) {
+ invariant(tailableAwaitData());
+ invariant(ns == NamespaceString::kRsOplogNamespace);
+ _term = term;
+ _lastKnownCommittedOpTime = lastCommittedOpTime;
+ }
+
protected:
struct Batch {
// TODO remove constructors after c++17 toolchain upgrade
@@ -288,6 +298,8 @@ private:
bool _connectionHasPendingReplies = false;
int _lastRequestId = 0;
Milliseconds _awaitDataTimeout = Milliseconds{0};
+ boost::optional<long long> _term;
+ boost::optional<repl::OpTime> _lastKnownCommittedOpTime;
void dataReceived(const Message& reply) {
bool retry;
diff --git a/src/mongo/client/dbclient_cursor_test.cpp b/src/mongo/client/dbclient_cursor_test.cpp
index d9f42d1ae7f..9df9dd01405 100644
--- a/src/mongo/client/dbclient_cursor_test.cpp
+++ b/src/mongo/client/dbclient_cursor_test.cpp
@@ -754,6 +754,90 @@ TEST_F(DBClientCursorTest, DBClientCursorTailableAwaitDataExhaust) {
ASSERT_TRUE(conn.getLastSentMessage().empty());
}
+TEST_F(DBClientCursorTest, DBClientCursorOplogQuery) {
+ // This tests DBClientCursor supports oplog query with special fields in the command request.
+ // 1. Initial find command has "filter", "tailable", "awaitData", "oplogReplay", "maxTimeMS",
+ // "batchSize", "term" and "readConcern" fields set.
+ // 2. A subsequent getMore command sets awaitData timeout and lastKnownCommittedOpTime
+ // correctly.
+
+ // Set up the DBClientCursor and a mock client connection.
+ DBClientConnectionForTest conn;
+ const NamespaceString nss = NamespaceString::kRsOplogNamespace;
+ const BSONObj filterObj = BSON("ts" << BSON("$gte" << Timestamp(123, 4)));
+ const BSONObj readConcernObj = BSON("afterClusterTime" << Timestamp(0, 1));
+ const long long maxTimeMS = 5000LL;
+ const long long term = 5;
+ const auto oplogQuery = QUERY("query" << filterObj << "readConcern" << readConcernObj
+ << "$maxTimeMS" << maxTimeMS << "term" << term);
+
+ DBClientCursor cursor(&conn,
+ NamespaceStringOrUUID(nss),
+ oplogQuery.obj,
+ 0,
+ 0,
+ nullptr,
+ QueryOption_CursorTailable | QueryOption_AwaitData |
+ QueryOption_OplogReplay,
+ 0);
+ cursor.setBatchSize(0);
+
+ // Set up mock 'find' response.
+ const long long cursorId = 42;
+ Message findResponseMsg = mockFindResponse(nss, cursorId, {});
+
+ conn.setCallResponse(findResponseMsg);
+ ASSERT(cursor.init());
+
+ // --- Test 1 ---
+ // Verify that the initial 'find' request was sent.
+ auto m = conn.getLastSentMessage();
+ ASSERT_FALSE(m.empty());
+ auto msg = OpMsg::parse(m);
+ ASSERT_EQ(OpMsg::flags(m), OpMsg::kChecksumPresent);
+ ASSERT_EQ(msg.body.getStringField("find"), nss.coll()) << msg.body;
+ ASSERT_BSONOBJ_EQ(msg.body["filter"].Obj(), filterObj);
+ ASSERT_TRUE(msg.body.getBoolField("tailable")) << msg.body;
+ ASSERT_TRUE(msg.body.getBoolField("awaitData")) << msg.body;
+ ASSERT_TRUE(msg.body.getBoolField("oplogReplay")) << msg.body;
+ ASSERT_EQ(msg.body["maxTimeMS"].numberLong(), maxTimeMS) << msg.body;
+ ASSERT_EQ(msg.body["batchSize"].number(), 0) << msg.body;
+ ASSERT_EQ(msg.body["term"].numberLong(), term) << msg.body;
+ ASSERT_BSONOBJ_EQ(msg.body["readConcern"].Obj(), readConcernObj);
+
+ cursor.setAwaitDataTimeoutMS(Milliseconds{5000});
+ ASSERT_EQ(cursor.getAwaitDataTimeoutMS(), Milliseconds{5000});
+
+ cursor.setCurrentTermAndLastCommittedOpTime(term, repl::OpTime(Timestamp(123, 4), term));
+
+ // --- Test 2 ---
+ // Create a 'getMore' response with two documents and set it as the mock response.
+ cursor.setBatchSize(2);
+ auto getMoreResponseMsg = mockGetMoreResponse(nss, cursorId, {docObj(1), docObj(2)});
+ conn.setCallResponse(getMoreResponseMsg);
+
+ // Request more results. This call should trigger the first 'getMore' request.
+ conn.clearLastSentMessage();
+ ASSERT_TRUE(cursor.more());
+ m = conn.getLastSentMessage();
+ ASSERT_FALSE(m.empty());
+ msg = OpMsg::parse(m);
+ ASSERT_EQ(StringData(msg.body.firstElement().fieldName()), "getMore") << msg.body;
+ ASSERT_EQ(msg.body["getMore"].type(), BSONType::NumberLong) << msg.body;
+ ASSERT_EQ(msg.body["getMore"].numberLong(), cursorId) << msg.body;
+ // Make sure the correct awaitData timeout is sent.
+ ASSERT_EQ(msg.body["maxTimeMS"].number(), 5000) << msg.body;
+ // Make sure the correct term is sent.
+ ASSERT_EQ(msg.body["term"].numberLong(), term) << msg.body;
+ // Make sure the correct lastKnownCommittedOpTime is sent.
+ ASSERT_EQ(msg.body["lastKnownCommittedOpTime"]["ts"].timestamp(), Timestamp(123, 4))
+ << msg.body;
+ ASSERT_EQ(msg.body["lastKnownCommittedOpTime"]["t"].numberLong(), term) << msg.body;
+ ASSERT_BSONOBJ_EQ(docObj(1), cursor.next());
+ ASSERT_BSONOBJ_EQ(docObj(2), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+ ASSERT_FALSE(cursor.isDead());
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp
index 43bd5d0bd74..cd308aae77a 100644
--- a/src/mongo/db/query/query_request.cpp
+++ b/src/mongo/db/query/query_request.cpp
@@ -79,40 +79,40 @@ Status checkFieldType(const BSONElement& el, BSONType type) {
return Status::OK();
}
+} // namespace
+
// Find command field names.
-const char kFilterField[] = "filter";
-const char kProjectionField[] = "projection";
-const char kSortField[] = "sort";
-const char kHintField[] = "hint";
-const char kCollationField[] = "collation";
-const char kSkipField[] = "skip";
-const char kLimitField[] = "limit";
-const char kBatchSizeField[] = "batchSize";
-const char kNToReturnField[] = "ntoreturn";
-const char kSingleBatchField[] = "singleBatch";
-const char kMaxField[] = "max";
-const char kMinField[] = "min";
-const char kReturnKeyField[] = "returnKey";
-const char kShowRecordIdField[] = "showRecordId";
-const char kTailableField[] = "tailable";
-const char kOplogReplayField[] = "oplogReplay";
-const char kNoCursorTimeoutField[] = "noCursorTimeout";
-const char kAwaitDataField[] = "awaitData";
-const char kPartialResultsField[] = "allowPartialResults";
-const char kRuntimeConstantsField[] = "runtimeConstants";
-const char kTermField[] = "term";
-const char kOptionsField[] = "options";
-const char kReadOnceField[] = "readOnce";
-const char kAllowSpeculativeMajorityReadField[] = "allowSpeculativeMajorityRead";
-const char kInternalReadAtClusterTimeField[] = "$_internalReadAtClusterTime";
-const char kRequestResumeTokenField[] = "$_requestResumeToken";
-const char kResumeAfterField[] = "$_resumeAfter";
-const char kUse44SortKeys[] = "_use44SortKeys";
+const char QueryRequest::kFilterField[] = "filter";
+const char QueryRequest::kProjectionField[] = "projection";
+const char QueryRequest::kSortField[] = "sort";
+const char QueryRequest::kHintField[] = "hint";
+const char QueryRequest::kCollationField[] = "collation";
+const char QueryRequest::kSkipField[] = "skip";
+const char QueryRequest::kLimitField[] = "limit";
+const char QueryRequest::kBatchSizeField[] = "batchSize";
+const char QueryRequest::kNToReturnField[] = "ntoreturn";
+const char QueryRequest::kSingleBatchField[] = "singleBatch";
+const char QueryRequest::kMaxField[] = "max";
+const char QueryRequest::kMinField[] = "min";
+const char QueryRequest::kReturnKeyField[] = "returnKey";
+const char QueryRequest::kShowRecordIdField[] = "showRecordId";
+const char QueryRequest::kTailableField[] = "tailable";
+const char QueryRequest::kOplogReplayField[] = "oplogReplay";
+const char QueryRequest::kNoCursorTimeoutField[] = "noCursorTimeout";
+const char QueryRequest::kAwaitDataField[] = "awaitData";
+const char QueryRequest::kPartialResultsField[] = "allowPartialResults";
+const char QueryRequest::kRuntimeConstantsField[] = "runtimeConstants";
+const char QueryRequest::kTermField[] = "term";
+const char QueryRequest::kOptionsField[] = "options";
+const char QueryRequest::kReadOnceField[] = "readOnce";
+const char QueryRequest::kAllowSpeculativeMajorityReadField[] = "allowSpeculativeMajorityRead";
+const char QueryRequest::kInternalReadAtClusterTimeField[] = "$_internalReadAtClusterTime";
+const char QueryRequest::kRequestResumeTokenField[] = "$_requestResumeToken";
+const char QueryRequest::kResumeAfterField[] = "$_resumeAfter";
+const char QueryRequest::kUse44SortKeys[] = "_use44SortKeys";
// Field names for sorting options.
-const char kNaturalSortField[] = "$natural";
-
-} // namespace
+const char QueryRequest::kNaturalSortField[] = "$natural";
const char QueryRequest::kFindCommandName[] = "find";
const char QueryRequest::kShardVersionField[] = "shardVersion";
diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h
index ae2ed26036b..490317960ce 100644
--- a/src/mongo/db/query/query_request.h
+++ b/src/mongo/db/query/query_request.h
@@ -52,6 +52,37 @@ class StatusWith;
*/
class QueryRequest {
public:
+ static const char kFilterField[];
+ static const char kProjectionField[];
+ static const char kSortField[];
+ static const char kHintField[];
+ static const char kCollationField[];
+ static const char kSkipField[];
+ static const char kLimitField[];
+ static const char kBatchSizeField[];
+ static const char kNToReturnField[];
+ static const char kSingleBatchField[];
+ static const char kMaxField[];
+ static const char kMinField[];
+ static const char kReturnKeyField[];
+ static const char kShowRecordIdField[];
+ static const char kTailableField[];
+ static const char kOplogReplayField[];
+ static const char kNoCursorTimeoutField[];
+ static const char kAwaitDataField[];
+ static const char kPartialResultsField[];
+ static const char kRuntimeConstantsField[];
+ static const char kTermField[];
+ static const char kOptionsField[];
+ static const char kReadOnceField[];
+ static const char kAllowSpeculativeMajorityReadField[];
+ static const char kInternalReadAtClusterTimeField[];
+ static const char kRequestResumeTokenField[];
+ static const char kResumeAfterField[];
+ static const char kUse44SortKeys[];
+
+ static const char kNaturalSortField[];
+
static const char kFindCommandName[];
static const char kShardVersionField[];