diff options
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/dbclient_cursor.cpp | 12 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor.h | 12 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor_test.cpp | 84 |
3 files changed, 106 insertions, 2 deletions
diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp index df26ca3e9c6..fe677d9f00c 100644 --- a/src/mongo/client/dbclient_cursor.cpp +++ b/src/mongo/client/dbclient_cursor.cpp @@ -131,6 +131,14 @@ Message DBClientCursor::_assembleInit() { // Legacy queries don't handle readOnce. qr.getValue()->setReadOnce(true); } + if (auto replTerm = query[QueryRequest::kTermField]) { + // Legacy queries don't handle term. + qr.getValue()->setReplicationTerm(replTerm.numberLong()); + } + if (auto readConcern = query[repl::ReadConcernArgs::kReadConcernFieldName]) { + // Legacy queries don't handle readConcern. + qr.getValue()->setReadConcern(readConcern.Obj()); + } BSONObj cmd = _nsOrUuid.uuid() ? qr.getValue()->asFindCommandWithUuid() : qr.getValue()->asFindCommand(); if (auto readPref = query["$readPreference"]) { @@ -164,8 +172,8 @@ Message DBClientCursor::_assembleGetMore() { boost::make_optional(batchSize != 0, batchSize), boost::make_optional(tailableAwaitData(), _awaitDataTimeout), // awaitDataTimeout - boost::none, // term - boost::none); // lastKnownCommittedOptime + _term, + _lastKnownCommittedOpTime); auto msg = assembleCommandRequest(_client, ns.db(), opts, gmr.toBSON()); // Set the exhaust flag if needed. if (opts & QueryOption_Exhaust && msg.operation() == dbMsg) { diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h index 4d7dd74dd1b..e8c7d91f8b8 100644 --- a/src/mongo/client/dbclient_cursor.h +++ b/src/mongo/client/dbclient_cursor.h @@ -235,6 +235,16 @@ public: _awaitDataTimeout = timeout; } + // Only used for tailable awaitData oplog fetching requests. + void setCurrentTermAndLastCommittedOpTime( + const boost::optional<long long>& term, + const boost::optional<repl::OpTime>& lastCommittedOpTime) { + invariant(tailableAwaitData()); + invariant(ns == NamespaceString::kRsOplogNamespace); + _term = term; + _lastKnownCommittedOpTime = lastCommittedOpTime; + } + protected: struct Batch { // TODO remove constructors after c++17 toolchain upgrade @@ -288,6 +298,8 @@ private: bool _connectionHasPendingReplies = false; int _lastRequestId = 0; Milliseconds _awaitDataTimeout = Milliseconds{0}; + boost::optional<long long> _term; + boost::optional<repl::OpTime> _lastKnownCommittedOpTime; void dataReceived(const Message& reply) { bool retry; diff --git a/src/mongo/client/dbclient_cursor_test.cpp b/src/mongo/client/dbclient_cursor_test.cpp index d9f42d1ae7f..9df9dd01405 100644 --- a/src/mongo/client/dbclient_cursor_test.cpp +++ b/src/mongo/client/dbclient_cursor_test.cpp @@ -754,6 +754,90 @@ TEST_F(DBClientCursorTest, DBClientCursorTailableAwaitDataExhaust) { ASSERT_TRUE(conn.getLastSentMessage().empty()); } +TEST_F(DBClientCursorTest, DBClientCursorOplogQuery) { + // This tests DBClientCursor supports oplog query with special fields in the command request. + // 1. Initial find command has "filter", "tailable", "awaitData", "oplogReplay", "maxTimeMS", + // "batchSize", "term" and "readConcern" fields set. + // 2. A subsequent getMore command sets awaitData timeout and lastKnownCommittedOpTime + // correctly. + + // Set up the DBClientCursor and a mock client connection. + DBClientConnectionForTest conn; + const NamespaceString nss = NamespaceString::kRsOplogNamespace; + const BSONObj filterObj = BSON("ts" << BSON("$gte" << Timestamp(123, 4))); + const BSONObj readConcernObj = BSON("afterClusterTime" << Timestamp(0, 1)); + const long long maxTimeMS = 5000LL; + const long long term = 5; + const auto oplogQuery = QUERY("query" << filterObj << "readConcern" << readConcernObj + << "$maxTimeMS" << maxTimeMS << "term" << term); + + DBClientCursor cursor(&conn, + NamespaceStringOrUUID(nss), + oplogQuery.obj, + 0, + 0, + nullptr, + QueryOption_CursorTailable | QueryOption_AwaitData | + QueryOption_OplogReplay, + 0); + cursor.setBatchSize(0); + + // Set up mock 'find' response. + const long long cursorId = 42; + Message findResponseMsg = mockFindResponse(nss, cursorId, {}); + + conn.setCallResponse(findResponseMsg); + ASSERT(cursor.init()); + + // --- Test 1 --- + // Verify that the initial 'find' request was sent. + auto m = conn.getLastSentMessage(); + ASSERT_FALSE(m.empty()); + auto msg = OpMsg::parse(m); + ASSERT_EQ(OpMsg::flags(m), OpMsg::kChecksumPresent); + ASSERT_EQ(msg.body.getStringField("find"), nss.coll()) << msg.body; + ASSERT_BSONOBJ_EQ(msg.body["filter"].Obj(), filterObj); + ASSERT_TRUE(msg.body.getBoolField("tailable")) << msg.body; + ASSERT_TRUE(msg.body.getBoolField("awaitData")) << msg.body; + ASSERT_TRUE(msg.body.getBoolField("oplogReplay")) << msg.body; + ASSERT_EQ(msg.body["maxTimeMS"].numberLong(), maxTimeMS) << msg.body; + ASSERT_EQ(msg.body["batchSize"].number(), 0) << msg.body; + ASSERT_EQ(msg.body["term"].numberLong(), term) << msg.body; + ASSERT_BSONOBJ_EQ(msg.body["readConcern"].Obj(), readConcernObj); + + cursor.setAwaitDataTimeoutMS(Milliseconds{5000}); + ASSERT_EQ(cursor.getAwaitDataTimeoutMS(), Milliseconds{5000}); + + cursor.setCurrentTermAndLastCommittedOpTime(term, repl::OpTime(Timestamp(123, 4), term)); + + // --- Test 2 --- + // Create a 'getMore' response with two documents and set it as the mock response. + cursor.setBatchSize(2); + auto getMoreResponseMsg = mockGetMoreResponse(nss, cursorId, {docObj(1), docObj(2)}); + conn.setCallResponse(getMoreResponseMsg); + + // Request more results. This call should trigger the first 'getMore' request. + conn.clearLastSentMessage(); + ASSERT_TRUE(cursor.more()); + m = conn.getLastSentMessage(); + ASSERT_FALSE(m.empty()); + msg = OpMsg::parse(m); + ASSERT_EQ(StringData(msg.body.firstElement().fieldName()), "getMore") << msg.body; + ASSERT_EQ(msg.body["getMore"].type(), BSONType::NumberLong) << msg.body; + ASSERT_EQ(msg.body["getMore"].numberLong(), cursorId) << msg.body; + // Make sure the correct awaitData timeout is sent. + ASSERT_EQ(msg.body["maxTimeMS"].number(), 5000) << msg.body; + // Make sure the correct term is sent. + ASSERT_EQ(msg.body["term"].numberLong(), term) << msg.body; + // Make sure the correct lastKnownCommittedOpTime is sent. + ASSERT_EQ(msg.body["lastKnownCommittedOpTime"]["ts"].timestamp(), Timestamp(123, 4)) + << msg.body; + ASSERT_EQ(msg.body["lastKnownCommittedOpTime"]["t"].numberLong(), term) << msg.body; + ASSERT_BSONOBJ_EQ(docObj(1), cursor.next()); + ASSERT_BSONOBJ_EQ(docObj(2), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + ASSERT_FALSE(cursor.isDead()); +} } // namespace } // namespace mongo |