From fbcee2558090f25bcaa00879b415f018b7da058b Mon Sep 17 00:00:00 2001 From: David Storch Date: Tue, 18 Jan 2022 15:14:46 +0000 Subject: SERVER-62147 Fix broken OP_QUERY exhaust cursor implementation (cherry picked from commit fb4b3eba611b3bc2408cc3e86fa1d1cba9085fde) --- ...ca_sets_kill_secondaries_jscore_passthrough.yml | 1 + etc/backports_required_for_multiversion_tests.yml | 2 + jstests/core/exhaust.js | 43 ++++++++++-- src/mongo/db/clientcursor.h | 12 +++- src/mongo/db/commands/find_cmd.cpp | 3 +- src/mongo/db/commands/list_collections.cpp | 3 +- src/mongo/db/commands/list_indexes.cpp | 3 +- src/mongo/db/commands/run_aggregate.cpp | 3 +- src/mongo/db/query/find.cpp | 21 +++--- src/mongo/dbtests/cursor_manager_test.cpp | 33 ++++++---- src/mongo/rpc/op_msg_integration_test.cpp | 76 ++++++++++++++++++++++ src/mongo/transport/service_state_machine.cpp | 2 +- 12 files changed, 169 insertions(+), 33 deletions(-) diff --git a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml index 3ccf503618d..aaa9494d0f5 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml @@ -18,6 +18,7 @@ selector: # primary's oplog when run as a part of burn_in_tests. - jstests/core/max_doc_size.js - jstests/core/mr_bigobject.js + - jstests/core/exhaust.js # The following tests also create large oplog entries due to the maximum blocking sort size being # 100 MB. - jstests/core/explain_execution_error.js diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 9604bccbed2..6b0b4fa9a14 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -193,6 +193,8 @@ all: test_file: jstests/auth/dbcheck.js - ticket: SERVER-62212 test_file: jstests/replsets/dbcheck_write_concern.js + - ticket: SERVER-62147 + test_file: jstests/core/exhaust.js suites: diff --git a/jstests/core/exhaust.js b/jstests/core/exhaust.js index 7e6c139d367..bc1ce387157 100644 --- a/jstests/core/exhaust.js +++ b/jstests/core/exhaust.js @@ -3,23 +3,56 @@ (function() { 'use strict'; -var c = db.exhaustColl; -c.drop(); +load("jstests/libs/fixture_helpers.js"); + +var coll = db.exhaustColl; +coll.drop(); const docCount = 4; for (var i = 0; i < docCount; i++) { - assert.commandWorked(c.insert({a: i})); + assert.commandWorked(coll.insert({a: i})); } // Check that the query works without exhaust set -assert.eq(c.find().batchSize(1).itcount(), docCount); +assert.eq(coll.find().batchSize(1).itcount(), docCount); // Now try to run the same query with exhaust try { - assert.eq(c.find().batchSize(1).addOption(DBQuery.Option.exhaust).itcount(), docCount); + assert.eq(coll.find().batchSize(1).addOption(DBQuery.Option.exhaust).itcount(), docCount); } catch (e) { // The exhaust option is not valid against mongos, ensure that this query throws the right // code assert.eq(e.code, 18526, () => tojson(e)); } + +// Test a case where the amount of data requires a response to the initial find operation as well as +// three getMore reply batches. +(function() { +// Skip this test case if we are connected to a mongos, since exhaust queries generally aren't +// expected to work against a mongos. +if (FixtureHelpers.isMongos(db)) { + return; +} + +coll.drop(); + +// Include a long string in each document so that the documents are a bit bigger than 16KB. +const strSize = 16 * 1024; +// The docs are ~16KB and each getMore response is 16MB. Therefore, a full getMore batch will +// contain about 1000 documents. Since the initial find response is limited to only 101 documents, +// by inserting 3000 we ensure that three subsequent getMore replies are required. Roughly speaking, +// the initial reply will consist of the first 100, the first getMore reply 1000 more, then another +// 1000, and then the remaining 900. +const numDocs = 3000; + +const str = "A".repeat(strSize); + +let bulk = coll.initializeUnorderedBulkOp(); +for (let i = 0; i < numDocs; ++i) { + bulk.insert({key: str}); +} +assert.commandWorked(bulk.execute()); + +assert.eq(numDocs, coll.find().addOption(DBQuery.Option.exhaust).itcount()); +}()); }()); diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 5979ed4521c..84f1cff4650 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -83,7 +83,8 @@ struct ClientCursorParams { BSONObj originatingCommandObj, LockPolicy lockPolicy, PrivilegeVector originatingPrivileges, - bool needsMerge) + bool needsMerge, + bool isOpQueryExhaust) : exec(std::move(planExecutor)), nss(std::move(nss)), writeConcernOptions(std::move(writeConcernOptions)), @@ -98,6 +99,15 @@ struct ClientCursorParams { while (authenticatedUsersIter.more()) { authenticatedUsers.emplace_back(authenticatedUsersIter.next()); } + + // There are separate implementations for exhaust cursors using OP_QUERY and exhaust cursors + // using OP_MSG. In the case of OP_QUERY exhaust, the OP_GET_MORE code consults the cursor's + // 'queryOptions' bit vector in order to determine whether another getMore should run again + // immediately without waiting for a request from the client. Here we set the exhaust bit if + // the cursor is being registered as an OP_QUERY exhaust cursor. + if (isOpQueryExhaust) { + queryOptions |= QueryOption_Exhaust; + } } void setTailable(bool tailable) { diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 782d76b8028..a70970406c5 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -621,7 +621,8 @@ public: _request.body, ClientCursorParams::LockPolicy::kLockExternally, {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}, - expCtx->needsMerge}); + expCtx->needsMerge, + false /*isOpQueryExhaust*/}); cursorId = pinnedCursor.getCursor()->cursorid(); invariant(!exec); diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index fae03f2159a..b6a7d73d8db 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -404,7 +404,8 @@ public: ClientCursorParams::LockPolicy::kLocksInternally, uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) ->checkAuthorizedToListCollections(dbname, jsobj)), - false // needsMerge always 'false' for listCollections. + false, // needsMerge always 'false' for listCollections. + false // isOpQueryExhaust }); appendCursorResponseObject( diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 0c3cf1c3053..fbf985a6b05 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -215,7 +215,8 @@ public: cmdObj, ClientCursorParams::LockPolicy::kLocksInternally, {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::listIndexes)}, - false // needsMerge always 'false' for listIndexes. + false, // needsMerge always 'false' for listIndexes. + false // isOpQueryExhaust }); appendCursorResponseObject( diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 93ef6659d87..a98b48f43cb 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -796,7 +796,8 @@ Status runAggregate(OperationContext* opCtx, cmdObj, lockPolicy, privileges, - expCtx->needsMerge); + expCtx->needsMerge, + false /*isOpQueryExhaust*/); if (expCtx->tailableMode == TailableModeEnum::kTailable) { cursorParams.setTailable(true); } else if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 10ee17cb3ca..0648cfb0011 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -777,17 +777,16 @@ bool runQuery(OperationContext* opCtx, // Allocate a new ClientCursor and register it with the cursor manager. ClientCursorPin pinnedCursor = CursorManager::get(opCtx)->registerCursor( opCtx, - { - std::move(exec), - nss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->getWriteConcern(), - readConcernArgs, - upconvertedQuery, - ClientCursorParams::LockPolicy::kLockExternally, - {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}, - false // needsMerge always 'false' for find(). - }); + {std::move(exec), + nss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), + readConcernArgs, + upconvertedQuery, + ClientCursorParams::LockPolicy::kLockExternally, + {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}, + false, // needsMerge always 'false' for find(). + opCtx->isExhaust()}); ccId = pinnedCursor.getCursor()->cursorid(); LOGV2_DEBUG( diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp index 124de6ead10..12234f50794 100644 --- a/src/mongo/dbtests/cursor_manager_test.cpp +++ b/src/mongo/dbtests/cursor_manager_test.cpp @@ -94,7 +94,8 @@ public: BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }; } @@ -148,7 +149,8 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursor) { BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); auto cursorId = cursorPin.getCursor()->cursorid(); @@ -178,7 +180,8 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursorMultiClient) { BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); auto cursorId = cursorPin.getCursor()->cursorid(); @@ -219,7 +222,8 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) { BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t())); @@ -239,7 +243,8 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) { BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t::max())); ASSERT_EQ(0UL, cursorManager->numCursors()); @@ -263,7 +268,8 @@ TEST_F(CursorManagerTest, InactivePinnedCursorShouldNotTimeout) { BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); // The pin is still in scope, so it should not time out. @@ -291,7 +297,8 @@ TEST_F(CursorManagerTest, MarkedAsKilledCursorsShouldBeDeletedOnCursorPin) { BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); auto cursorId = cursorPin->cursorid(); @@ -328,7 +335,8 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsShouldTimeout) { BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); // A cursor will stay alive, but be marked as killed, if it is interrupted with a code other @@ -364,7 +372,8 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) { BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); auto usedCursorId = cursorPin.getCursor()->cursorid(); cursorPin.release(); @@ -382,7 +391,8 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) { BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); // Advance the clock to simulate time passing. @@ -423,7 +433,8 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing BSONObj(), ClientCursorParams::LockPolicy::kLocksInternally, PrivilegeVector(), - false // needsMerge + false, // needsMerge + false // isOpQueryExhaust }); // Advance the clock to simulate time passing. diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index 619ccd1f594..dc536939f3b 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -555,6 +555,82 @@ TEST(OpMsg, MongosIgnoresExhaustForGetMore) { ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 1)); } +TEST(OpMsg, ExhaustWorksForAggCursor) { + std::string errMsg; + auto conn = std::unique_ptr( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, conn); + + // Only test exhaust against a standalone. + if (conn->isReplicaSetMember() || conn->isMongos()) { + return; + } + + NamespaceString nss("test", "coll"); + + conn->dropCollection(nss.toString()); + + // Insert 5 documents so that a cursor using a batchSize of 2 requires three batches to get all + // the results. + for (int i = 0; i < 5; i++) { + conn->insert(nss.toString(), BSON("_id" << i)); + } + + // Issue an agg request to open a cursor but return 0 documents. Specify a sort in order to + // guarantee their return order. + auto aggCmd = BSON("aggregate" << nss.coll() << "cursor" << BSON("batchSize" << 0) << "pipeline" + << BSON_ARRAY(BSON("$sort" << BSON("_id" << 1)))); + auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), aggCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + const long long cursorId = res["cursor"]["id"].numberLong(); + ASSERT(res["cursor"]["firstBatch"].Array().empty()); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + // Construct getMore request with exhaust flag. Set batch size so we will need multiple batches + // to exhaust the cursor. + int batchSize = 2; + GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none); + opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON()); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + auto assertNextBatch = + [](const Message& msg, CursorId expectedCursorId, std::vector expectedBatch) { + auto cmdReply = OpMsg::parse(msg).body; + ASSERT_OK(getStatusFromCommandResult(cmdReply)); + ASSERT_EQ(cmdReply["cursor"]["id"].numberLong(), expectedCursorId); + std::vector nextBatch = cmdReply["cursor"]["nextBatch"].Array(); + ASSERT_EQ(nextBatch.size(), expectedBatch.size()); + auto it = expectedBatch.begin(); + for (auto&& batchElt : nextBatch) { + ASSERT(it != expectedBatch.end()); + ASSERT_BSONOBJ_EQ(batchElt.embeddedObject(), *it); + ++it; + } + }; + + // Run getMore to initiate the exhaust stream. + ASSERT(conn->call(request, reply)); + auto lastRequestId = reply.header().getId(); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + assertNextBatch(reply, cursorId, {BSON("_id" << 0), BSON("_id" << 1)}); + + // Receive next exhaust batch. + ASSERT_OK(conn->recv(reply, lastRequestId)); + lastRequestId = reply.header().getId(); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + assertNextBatch(reply, cursorId, {BSON("_id" << 2), BSON("_id" << 3)}); + + // Receive terminal batch. + ASSERT_OK(conn->recv(reply, lastRequestId)); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + assertNextBatch(reply, 0, {BSON("_id" << 4)}); +} + TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) { std::string errMsg; auto fixtureConn = std::unique_ptr( diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index ac67e983765..42a633829af 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -113,7 +113,7 @@ Message makeLegacyExhaustMessage(Message* m, const DbResponse& dbresponse) { * 'synthetic' exhaust request. Returns an empty message if exhaust is not allowed. */ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { - if (requestMsg.operation() == dbQuery) { + if (requestMsg.operation() == dbQuery || requestMsg.operation() == dbGetMore) { return makeLegacyExhaustMessage(&requestMsg, *dbresponse); } -- cgit v1.2.1