diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2019-12-19 21:38:39 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-19 21:38:39 +0000 |
commit | d1ba3bc8890f54e5dad91da85ce56626859c166c (patch) | |
tree | 4dac59147e0774516f743715424ecf6765609f39 /src | |
parent | b4db881a18cbe15127a5a60c971cd393e0621466 (diff) | |
download | mongo-d1ba3bc8890f54e5dad91da85ce56626859c166c.tar.gz |
SERVER-45232: Support oplog query in DBClientCursor
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/client/dbclient_cursor.cpp | 12 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor.h | 12 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor_test.cpp | 84 | ||||
-rw-r--r-- | src/mongo/db/query/query_request.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/query/query_request.h | 31 |
5 files changed, 168 insertions, 33 deletions
diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp index df26ca3e9c6..fe677d9f00c 100644 --- a/src/mongo/client/dbclient_cursor.cpp +++ b/src/mongo/client/dbclient_cursor.cpp @@ -131,6 +131,14 @@ Message DBClientCursor::_assembleInit() { // Legacy queries don't handle readOnce. qr.getValue()->setReadOnce(true); } + if (auto replTerm = query[QueryRequest::kTermField]) { + // Legacy queries don't handle term. + qr.getValue()->setReplicationTerm(replTerm.numberLong()); + } + if (auto readConcern = query[repl::ReadConcernArgs::kReadConcernFieldName]) { + // Legacy queries don't handle readConcern. + qr.getValue()->setReadConcern(readConcern.Obj()); + } BSONObj cmd = _nsOrUuid.uuid() ? qr.getValue()->asFindCommandWithUuid() : qr.getValue()->asFindCommand(); if (auto readPref = query["$readPreference"]) { @@ -164,8 +172,8 @@ Message DBClientCursor::_assembleGetMore() { boost::make_optional(batchSize != 0, batchSize), boost::make_optional(tailableAwaitData(), _awaitDataTimeout), // awaitDataTimeout - boost::none, // term - boost::none); // lastKnownCommittedOptime + _term, + _lastKnownCommittedOpTime); auto msg = assembleCommandRequest(_client, ns.db(), opts, gmr.toBSON()); // Set the exhaust flag if needed. if (opts & QueryOption_Exhaust && msg.operation() == dbMsg) { diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h index 4d7dd74dd1b..e8c7d91f8b8 100644 --- a/src/mongo/client/dbclient_cursor.h +++ b/src/mongo/client/dbclient_cursor.h @@ -235,6 +235,16 @@ public: _awaitDataTimeout = timeout; } + // Only used for tailable awaitData oplog fetching requests. + void setCurrentTermAndLastCommittedOpTime( + const boost::optional<long long>& term, + const boost::optional<repl::OpTime>& lastCommittedOpTime) { + invariant(tailableAwaitData()); + invariant(ns == NamespaceString::kRsOplogNamespace); + _term = term; + _lastKnownCommittedOpTime = lastCommittedOpTime; + } + protected: struct Batch { // TODO remove constructors after c++17 toolchain upgrade @@ -288,6 +298,8 @@ private: bool _connectionHasPendingReplies = false; int _lastRequestId = 0; Milliseconds _awaitDataTimeout = Milliseconds{0}; + boost::optional<long long> _term; + boost::optional<repl::OpTime> _lastKnownCommittedOpTime; void dataReceived(const Message& reply) { bool retry; diff --git a/src/mongo/client/dbclient_cursor_test.cpp b/src/mongo/client/dbclient_cursor_test.cpp index d9f42d1ae7f..9df9dd01405 100644 --- a/src/mongo/client/dbclient_cursor_test.cpp +++ b/src/mongo/client/dbclient_cursor_test.cpp @@ -754,6 +754,90 @@ TEST_F(DBClientCursorTest, DBClientCursorTailableAwaitDataExhaust) { ASSERT_TRUE(conn.getLastSentMessage().empty()); } +TEST_F(DBClientCursorTest, DBClientCursorOplogQuery) { + // This tests DBClientCursor supports oplog query with special fields in the command request. + // 1. Initial find command has "filter", "tailable", "awaitData", "oplogReplay", "maxTimeMS", + // "batchSize", "term" and "readConcern" fields set. + // 2. A subsequent getMore command sets awaitData timeout and lastKnownCommittedOpTime + // correctly. + + // Set up the DBClientCursor and a mock client connection. + DBClientConnectionForTest conn; + const NamespaceString nss = NamespaceString::kRsOplogNamespace; + const BSONObj filterObj = BSON("ts" << BSON("$gte" << Timestamp(123, 4))); + const BSONObj readConcernObj = BSON("afterClusterTime" << Timestamp(0, 1)); + const long long maxTimeMS = 5000LL; + const long long term = 5; + const auto oplogQuery = QUERY("query" << filterObj << "readConcern" << readConcernObj + << "$maxTimeMS" << maxTimeMS << "term" << term); + + DBClientCursor cursor(&conn, + NamespaceStringOrUUID(nss), + oplogQuery.obj, + 0, + 0, + nullptr, + QueryOption_CursorTailable | QueryOption_AwaitData | + QueryOption_OplogReplay, + 0); + cursor.setBatchSize(0); + + // Set up mock 'find' response. + const long long cursorId = 42; + Message findResponseMsg = mockFindResponse(nss, cursorId, {}); + + conn.setCallResponse(findResponseMsg); + ASSERT(cursor.init()); + + // --- Test 1 --- + // Verify that the initial 'find' request was sent. + auto m = conn.getLastSentMessage(); + ASSERT_FALSE(m.empty()); + auto msg = OpMsg::parse(m); + ASSERT_EQ(OpMsg::flags(m), OpMsg::kChecksumPresent); + ASSERT_EQ(msg.body.getStringField("find"), nss.coll()) << msg.body; + ASSERT_BSONOBJ_EQ(msg.body["filter"].Obj(), filterObj); + ASSERT_TRUE(msg.body.getBoolField("tailable")) << msg.body; + ASSERT_TRUE(msg.body.getBoolField("awaitData")) << msg.body; + ASSERT_TRUE(msg.body.getBoolField("oplogReplay")) << msg.body; + ASSERT_EQ(msg.body["maxTimeMS"].numberLong(), maxTimeMS) << msg.body; + ASSERT_EQ(msg.body["batchSize"].number(), 0) << msg.body; + ASSERT_EQ(msg.body["term"].numberLong(), term) << msg.body; + ASSERT_BSONOBJ_EQ(msg.body["readConcern"].Obj(), readConcernObj); + + cursor.setAwaitDataTimeoutMS(Milliseconds{5000}); + ASSERT_EQ(cursor.getAwaitDataTimeoutMS(), Milliseconds{5000}); + + cursor.setCurrentTermAndLastCommittedOpTime(term, repl::OpTime(Timestamp(123, 4), term)); + + // --- Test 2 --- + // Create a 'getMore' response with two documents and set it as the mock response. + cursor.setBatchSize(2); + auto getMoreResponseMsg = mockGetMoreResponse(nss, cursorId, {docObj(1), docObj(2)}); + conn.setCallResponse(getMoreResponseMsg); + + // Request more results. This call should trigger the first 'getMore' request. + conn.clearLastSentMessage(); + ASSERT_TRUE(cursor.more()); + m = conn.getLastSentMessage(); + ASSERT_FALSE(m.empty()); + msg = OpMsg::parse(m); + ASSERT_EQ(StringData(msg.body.firstElement().fieldName()), "getMore") << msg.body; + ASSERT_EQ(msg.body["getMore"].type(), BSONType::NumberLong) << msg.body; + ASSERT_EQ(msg.body["getMore"].numberLong(), cursorId) << msg.body; + // Make sure the correct awaitData timeout is sent. + ASSERT_EQ(msg.body["maxTimeMS"].number(), 5000) << msg.body; + // Make sure the correct term is sent. + ASSERT_EQ(msg.body["term"].numberLong(), term) << msg.body; + // Make sure the correct lastKnownCommittedOpTime is sent. + ASSERT_EQ(msg.body["lastKnownCommittedOpTime"]["ts"].timestamp(), Timestamp(123, 4)) + << msg.body; + ASSERT_EQ(msg.body["lastKnownCommittedOpTime"]["t"].numberLong(), term) << msg.body; + ASSERT_BSONOBJ_EQ(docObj(1), cursor.next()); + ASSERT_BSONOBJ_EQ(docObj(2), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + ASSERT_FALSE(cursor.isDead()); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp index 43bd5d0bd74..cd308aae77a 100644 --- a/src/mongo/db/query/query_request.cpp +++ b/src/mongo/db/query/query_request.cpp @@ -79,40 +79,40 @@ Status checkFieldType(const BSONElement& el, BSONType type) { return Status::OK(); } +} // namespace + // Find command field names. -const char kFilterField[] = "filter"; -const char kProjectionField[] = "projection"; -const char kSortField[] = "sort"; -const char kHintField[] = "hint"; -const char kCollationField[] = "collation"; -const char kSkipField[] = "skip"; -const char kLimitField[] = "limit"; -const char kBatchSizeField[] = "batchSize"; -const char kNToReturnField[] = "ntoreturn"; -const char kSingleBatchField[] = "singleBatch"; -const char kMaxField[] = "max"; -const char kMinField[] = "min"; -const char kReturnKeyField[] = "returnKey"; -const char kShowRecordIdField[] = "showRecordId"; -const char kTailableField[] = "tailable"; -const char kOplogReplayField[] = "oplogReplay"; -const char kNoCursorTimeoutField[] = "noCursorTimeout"; -const char kAwaitDataField[] = "awaitData"; -const char kPartialResultsField[] = "allowPartialResults"; -const char kRuntimeConstantsField[] = "runtimeConstants"; -const char kTermField[] = "term"; -const char kOptionsField[] = "options"; -const char kReadOnceField[] = "readOnce"; -const char kAllowSpeculativeMajorityReadField[] = "allowSpeculativeMajorityRead"; -const char kInternalReadAtClusterTimeField[] = "$_internalReadAtClusterTime"; -const char kRequestResumeTokenField[] = "$_requestResumeToken"; -const char kResumeAfterField[] = "$_resumeAfter"; -const char kUse44SortKeys[] = "_use44SortKeys"; +const char QueryRequest::kFilterField[] = "filter"; +const char QueryRequest::kProjectionField[] = "projection"; +const char QueryRequest::kSortField[] = "sort"; +const char QueryRequest::kHintField[] = "hint"; +const char QueryRequest::kCollationField[] = "collation"; +const char QueryRequest::kSkipField[] = "skip"; +const char QueryRequest::kLimitField[] = "limit"; +const char QueryRequest::kBatchSizeField[] = "batchSize"; +const char QueryRequest::kNToReturnField[] = "ntoreturn"; +const char QueryRequest::kSingleBatchField[] = "singleBatch"; +const char QueryRequest::kMaxField[] = "max"; +const char QueryRequest::kMinField[] = "min"; +const char QueryRequest::kReturnKeyField[] = "returnKey"; +const char QueryRequest::kShowRecordIdField[] = "showRecordId"; +const char QueryRequest::kTailableField[] = "tailable"; +const char QueryRequest::kOplogReplayField[] = "oplogReplay"; +const char QueryRequest::kNoCursorTimeoutField[] = "noCursorTimeout"; +const char QueryRequest::kAwaitDataField[] = "awaitData"; +const char QueryRequest::kPartialResultsField[] = "allowPartialResults"; +const char QueryRequest::kRuntimeConstantsField[] = "runtimeConstants"; +const char QueryRequest::kTermField[] = "term"; +const char QueryRequest::kOptionsField[] = "options"; +const char QueryRequest::kReadOnceField[] = "readOnce"; +const char QueryRequest::kAllowSpeculativeMajorityReadField[] = "allowSpeculativeMajorityRead"; +const char QueryRequest::kInternalReadAtClusterTimeField[] = "$_internalReadAtClusterTime"; +const char QueryRequest::kRequestResumeTokenField[] = "$_requestResumeToken"; +const char QueryRequest::kResumeAfterField[] = "$_resumeAfter"; +const char QueryRequest::kUse44SortKeys[] = "_use44SortKeys"; // Field names for sorting options. -const char kNaturalSortField[] = "$natural"; - -} // namespace +const char QueryRequest::kNaturalSortField[] = "$natural"; const char QueryRequest::kFindCommandName[] = "find"; const char QueryRequest::kShardVersionField[] = "shardVersion"; diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h index ae2ed26036b..490317960ce 100644 --- a/src/mongo/db/query/query_request.h +++ b/src/mongo/db/query/query_request.h @@ -52,6 +52,37 @@ class StatusWith; */ class QueryRequest { public: + static const char kFilterField[]; + static const char kProjectionField[]; + static const char kSortField[]; + static const char kHintField[]; + static const char kCollationField[]; + static const char kSkipField[]; + static const char kLimitField[]; + static const char kBatchSizeField[]; + static const char kNToReturnField[]; + static const char kSingleBatchField[]; + static const char kMaxField[]; + static const char kMinField[]; + static const char kReturnKeyField[]; + static const char kShowRecordIdField[]; + static const char kTailableField[]; + static const char kOplogReplayField[]; + static const char kNoCursorTimeoutField[]; + static const char kAwaitDataField[]; + static const char kPartialResultsField[]; + static const char kRuntimeConstantsField[]; + static const char kTermField[]; + static const char kOptionsField[]; + static const char kReadOnceField[]; + static const char kAllowSpeculativeMajorityReadField[]; + static const char kInternalReadAtClusterTimeField[]; + static const char kRequestResumeTokenField[]; + static const char kResumeAfterField[]; + static const char kUse44SortKeys[]; + + static const char kNaturalSortField[]; + static const char kFindCommandName[]; static const char kShardVersionField[]; |