diff options
author | David Storch <david.storch@mongodb.com> | 2022-01-20 17:21:58 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-20 18:28:01 +0000 |
commit | d3daee1ab48ffe0dbd0d32cafcc339cf6bbe4f30 (patch) | |
tree | 70da3b984ab38a2b197d69dbdb31b5fc94ec7e19 | |
parent | ff45c65509992857fcdb422029b02e04f383ae46 (diff) | |
download | mongo-d3daee1ab48ffe0dbd0d32cafcc339cf6bbe4f30.tar.gz |
SERVER-62147 Fix broken OP_QUERY exhaust cursor implementation
(cherry picked from commit fb4b3eba611b3bc2408cc3e86fa1d1cba9085fde)
(cherry picked from commit fbcee2558090f25bcaa00879b415f018b7da058b)
-rw-r--r-- | buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml | 1 | ||||
-rw-r--r-- | etc/backports_required_for_multiversion_tests.yml | 2 | ||||
-rw-r--r-- | jstests/core/exhaust.js | 43 | ||||
-rw-r--r-- | src/mongo/rpc/op_msg_integration_test.cpp | 76 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 2 |
5 files changed, 118 insertions, 6 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml index 395b2072c56..0ce826f6ba6 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml @@ -20,6 +20,7 @@ selector: # primary's oplog when run as a part of burn_in_tests. - jstests/core/max_doc_size.js - jstests/core/mr_bigobject.js + - jstests/core/exhaust.js exclude_with_any_tags: # emptycapped is not supported with rollback using recover-to-timestamp. - requires_emptycapped diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 82662d7f1f7..0fa0e4b083f 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -128,6 +128,8 @@ all: test_file: jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js - ticket: SERVER-60685 test_file: jstests/sharding/cancel_coordinate_txn_commit_with_tickets_exhausted.js + - ticket: SERVER-62147 + test_file: jstests/core/exhaust.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/core/exhaust.js b/jstests/core/exhaust.js index 125c70cefe8..bc1ce387157 100644 --- a/jstests/core/exhaust.js +++ b/jstests/core/exhaust.js @@ -3,23 +3,56 @@ (function() { 'use strict'; -var c = db.exhaustColl; -c.drop(); +load("jstests/libs/fixture_helpers.js"); + +var coll = db.exhaustColl; +coll.drop(); const docCount = 4; for (var i = 0; i < docCount; i++) { - assert.writeOK(c.insert({a: i})); + assert.commandWorked(coll.insert({a: i})); } // Check that the query works without exhaust set -assert.eq(c.find().batchSize(1).itcount(), docCount); +assert.eq(coll.find().batchSize(1).itcount(), docCount); // Now try to run the same query with exhaust try { - assert.eq(c.find().batchSize(1).addOption(DBQuery.Option.exhaust).itcount(), docCount); + assert.eq(coll.find().batchSize(1).addOption(DBQuery.Option.exhaust).itcount(), docCount); } catch (e) { // The exhaust option is not valid against mongos, ensure that this query throws the right // code assert.eq(e.code, 18526, () => tojson(e)); } + +// Test a case where the amount of data requires a response to the initial find operation as well as +// three getMore reply batches. +(function() { +// Skip this test case if we are connected to a mongos, since exhaust queries generally aren't +// expected to work against a mongos. +if (FixtureHelpers.isMongos(db)) { + return; +} + +coll.drop(); + +// Include a long string in each document so that the documents are a bit bigger than 16KB. +const strSize = 16 * 1024; +// The docs are ~16KB and each getMore response is 16MB. Therefore, a full getMore batch will +// contain about 1000 documents. Since the initial find response is limited to only 101 documents, +// by inserting 3000 we ensure that three subsequent getMore replies are required. Roughly speaking, +// the initial reply will consist of the first 100, the first getMore reply 1000 more, then another +// 1000, and then the remaining 900. +const numDocs = 3000; + +const str = "A".repeat(strSize); + +let bulk = coll.initializeUnorderedBulkOp(); +for (let i = 0; i < numDocs; ++i) { + bulk.insert({key: str}); +} +assert.commandWorked(bulk.execute()); + +assert.eq(numDocs, coll.find().addOption(DBQuery.Option.exhaust).itcount()); +}()); }()); diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index 756bebf21f2..f630c370e2b 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -438,6 +438,82 @@ TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) { ASSERT(cursor->isDead()); } +TEST(OpMsg, ExhaustWorksForAggCursor) { + std::string errMsg; + auto conn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, conn); + + // Only test exhaust against a standalone. + if (conn->isReplicaSetMember() || conn->isMongos()) { + return; + } + + NamespaceString nss("test", "coll"); + + conn->dropCollection(nss.toString()); + + // Insert 5 documents so that a cursor using a batchSize of 2 requires three batches to get all + // the results. + for (int i = 0; i < 5; i++) { + conn->insert(nss.toString(), BSON("_id" << i)); + } + + // Issue an agg request to open a cursor but return 0 documents. Specify a sort in order to + // guarantee their return order. + auto aggCmd = BSON("aggregate" << nss.coll() << "cursor" << BSON("batchSize" << 0) << "pipeline" + << BSON_ARRAY(BSON("$sort" << BSON("_id" << 1)))); + auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), aggCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + const long long cursorId = res["cursor"]["id"].numberLong(); + ASSERT(res["cursor"]["firstBatch"].Array().empty()); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + // Construct getMore request with exhaust flag. Set batch size so we will need multiple batches + // to exhaust the cursor. + int batchSize = 2; + GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none); + opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON()); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + auto assertNextBatch = + [](const Message& msg, CursorId expectedCursorId, std::vector<BSONObj> expectedBatch) { + auto cmdReply = OpMsg::parse(msg).body; + ASSERT_OK(getStatusFromCommandResult(cmdReply)); + ASSERT_EQ(cmdReply["cursor"]["id"].numberLong(), expectedCursorId); + std::vector<BSONElement> nextBatch = cmdReply["cursor"]["nextBatch"].Array(); + ASSERT_EQ(nextBatch.size(), expectedBatch.size()); + auto it = expectedBatch.begin(); + for (auto&& batchElt : nextBatch) { + ASSERT(it != expectedBatch.end()); + ASSERT_BSONOBJ_EQ(batchElt.embeddedObject(), *it); + ++it; + } + }; + + // Run getMore to initiate the exhaust stream. + ASSERT(conn->call(request, reply)); + auto lastRequestId = reply.header().getId(); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + assertNextBatch(reply, cursorId, {BSON("_id" << 0), BSON("_id" << 1)}); + + // Receive next exhaust batch. + ASSERT(conn->recv(reply, lastRequestId)); + lastRequestId = reply.header().getId(); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + assertNextBatch(reply, cursorId, {BSON("_id" << 2), BSON("_id" << 3)}); + + // Receive terminal batch. + ASSERT(conn->recv(reply, lastRequestId)); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + assertNextBatch(reply, 0, {BSON("_id" << 4)}); +} + void checksumTest(bool enableChecksum) { // The server replies with a checksum if and only if the request has a checksum. std::string errMsg; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index effeeae3903..2f0b4e457fa 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -106,7 +106,7 @@ Message makeLegacyExhaustMessage(Message* m, const DbResponse& dbresponse) { * Currently only supports exhaust for 'getMore' commands. */ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { - if (requestMsg.operation() == dbQuery) { + if (requestMsg.operation() == dbQuery || requestMsg.operation() == dbGetMore) { return makeLegacyExhaustMessage(&requestMsg, *dbresponse); } |