summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2022-01-18 15:14:46 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-18 15:44:51 +0000
commitfbcee2558090f25bcaa00879b415f018b7da058b (patch)
tree52e9ba3a4e50587835dfe178a9b453ac09aa8ffc
parent408474bcb949872e8757b6b3e4dca2dc3816462c (diff)
downloadmongo-fbcee2558090f25bcaa00879b415f018b7da058b.tar.gz
SERVER-62147 Fix broken OP_QUERY exhaust cursor implementation
(cherry picked from commit fb4b3eba611b3bc2408cc3e86fa1d1cba9085fde)
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml1
-rw-r--r--etc/backports_required_for_multiversion_tests.yml2
-rw-r--r--jstests/core/exhaust.js43
-rw-r--r--src/mongo/db/clientcursor.h12
-rw-r--r--src/mongo/db/commands/find_cmd.cpp3
-rw-r--r--src/mongo/db/commands/list_collections.cpp3
-rw-r--r--src/mongo/db/commands/list_indexes.cpp3
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp3
-rw-r--r--src/mongo/db/query/find.cpp21
-rw-r--r--src/mongo/dbtests/cursor_manager_test.cpp33
-rw-r--r--src/mongo/rpc/op_msg_integration_test.cpp76
-rw-r--r--src/mongo/transport/service_state_machine.cpp2
12 files changed, 169 insertions, 33 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
index 3ccf503618d..aaa9494d0f5 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
@@ -18,6 +18,7 @@ selector:
# primary's oplog when run as a part of burn_in_tests.
- jstests/core/max_doc_size.js
- jstests/core/mr_bigobject.js
+ - jstests/core/exhaust.js
# The following tests also create large oplog entries due to the maximum blocking sort size being
# 100 MB.
- jstests/core/explain_execution_error.js
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 9604bccbed2..6b0b4fa9a14 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -193,6 +193,8 @@ all:
test_file: jstests/auth/dbcheck.js
- ticket: SERVER-62212
test_file: jstests/replsets/dbcheck_write_concern.js
+ - ticket: SERVER-62147
+ test_file: jstests/core/exhaust.js
suites:
diff --git a/jstests/core/exhaust.js b/jstests/core/exhaust.js
index 7e6c139d367..bc1ce387157 100644
--- a/jstests/core/exhaust.js
+++ b/jstests/core/exhaust.js
@@ -3,23 +3,56 @@
(function() {
'use strict';
-var c = db.exhaustColl;
-c.drop();
+load("jstests/libs/fixture_helpers.js");
+
+var coll = db.exhaustColl;
+coll.drop();
const docCount = 4;
for (var i = 0; i < docCount; i++) {
- assert.commandWorked(c.insert({a: i}));
+ assert.commandWorked(coll.insert({a: i}));
}
// Check that the query works without exhaust set
-assert.eq(c.find().batchSize(1).itcount(), docCount);
+assert.eq(coll.find().batchSize(1).itcount(), docCount);
// Now try to run the same query with exhaust
try {
- assert.eq(c.find().batchSize(1).addOption(DBQuery.Option.exhaust).itcount(), docCount);
+ assert.eq(coll.find().batchSize(1).addOption(DBQuery.Option.exhaust).itcount(), docCount);
} catch (e) {
// The exhaust option is not valid against mongos, ensure that this query throws the right
// code
assert.eq(e.code, 18526, () => tojson(e));
}
+
+// Test a case where the amount of data requires a response to the initial find operation as well as
+// three getMore reply batches.
+(function() {
+// Skip this test case if we are connected to a mongos, since exhaust queries generally aren't
+// expected to work against a mongos.
+if (FixtureHelpers.isMongos(db)) {
+ return;
+}
+
+coll.drop();
+
+// Include a long string in each document so that the documents are a bit bigger than 16KB.
+const strSize = 16 * 1024;
+// The docs are ~16KB and each getMore response is 16MB. Therefore, a full getMore batch will
+// contain about 1000 documents. Since the initial find response is limited to only 101 documents,
+// by inserting 3000 we ensure that three subsequent getMore replies are required. Roughly speaking,
+// the initial reply will consist of the first 100, the first getMore reply 1000 more, then another
+// 1000, and then the remaining 900.
+const numDocs = 3000;
+
+const str = "A".repeat(strSize);
+
+let bulk = coll.initializeUnorderedBulkOp();
+for (let i = 0; i < numDocs; ++i) {
+ bulk.insert({key: str});
+}
+assert.commandWorked(bulk.execute());
+
+assert.eq(numDocs, coll.find().addOption(DBQuery.Option.exhaust).itcount());
+}());
}());
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 5979ed4521c..84f1cff4650 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -83,7 +83,8 @@ struct ClientCursorParams {
BSONObj originatingCommandObj,
LockPolicy lockPolicy,
PrivilegeVector originatingPrivileges,
- bool needsMerge)
+ bool needsMerge,
+ bool isOpQueryExhaust)
: exec(std::move(planExecutor)),
nss(std::move(nss)),
writeConcernOptions(std::move(writeConcernOptions)),
@@ -98,6 +99,15 @@ struct ClientCursorParams {
while (authenticatedUsersIter.more()) {
authenticatedUsers.emplace_back(authenticatedUsersIter.next());
}
+
+ // There are separate implementations for exhaust cursors using OP_QUERY and exhaust cursors
+ // using OP_MSG. In the case of OP_QUERY exhaust, the OP_GET_MORE code consults the cursor's
+ // 'queryOptions' bit vector in order to determine whether another getMore should run again
+ // immediately without waiting for a request from the client. Here we set the exhaust bit if
+ // the cursor is being registered as an OP_QUERY exhaust cursor.
+ if (isOpQueryExhaust) {
+ queryOptions |= QueryOption_Exhaust;
+ }
}
void setTailable(bool tailable) {
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 782d76b8028..a70970406c5 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -621,7 +621,8 @@ public:
_request.body,
ClientCursorParams::LockPolicy::kLockExternally,
{Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)},
- expCtx->needsMerge});
+ expCtx->needsMerge,
+ false /*isOpQueryExhaust*/});
cursorId = pinnedCursor.getCursor()->cursorid();
invariant(!exec);
diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp
index fae03f2159a..b6a7d73d8db 100644
--- a/src/mongo/db/commands/list_collections.cpp
+++ b/src/mongo/db/commands/list_collections.cpp
@@ -404,7 +404,8 @@ public:
ClientCursorParams::LockPolicy::kLocksInternally,
uassertStatusOK(AuthorizationSession::get(opCtx->getClient())
->checkAuthorizedToListCollections(dbname, jsobj)),
- false // needsMerge always 'false' for listCollections.
+ false, // needsMerge always 'false' for listCollections.
+ false // isOpQueryExhaust
});
appendCursorResponseObject(
diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp
index 0c3cf1c3053..fbf985a6b05 100644
--- a/src/mongo/db/commands/list_indexes.cpp
+++ b/src/mongo/db/commands/list_indexes.cpp
@@ -215,7 +215,8 @@ public:
cmdObj,
ClientCursorParams::LockPolicy::kLocksInternally,
{Privilege(ResourcePattern::forExactNamespace(nss), ActionType::listIndexes)},
- false // needsMerge always 'false' for listIndexes.
+ false, // needsMerge always 'false' for listIndexes.
+ false // isOpQueryExhaust
});
appendCursorResponseObject(
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 93ef6659d87..a98b48f43cb 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -796,7 +796,8 @@ Status runAggregate(OperationContext* opCtx,
cmdObj,
lockPolicy,
privileges,
- expCtx->needsMerge);
+ expCtx->needsMerge,
+ false /*isOpQueryExhaust*/);
if (expCtx->tailableMode == TailableModeEnum::kTailable) {
cursorParams.setTailable(true);
} else if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 10ee17cb3ca..0648cfb0011 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -777,17 +777,16 @@ bool runQuery(OperationContext* opCtx,
// Allocate a new ClientCursor and register it with the cursor manager.
ClientCursorPin pinnedCursor = CursorManager::get(opCtx)->registerCursor(
opCtx,
- {
- std::move(exec),
- nss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->getWriteConcern(),
- readConcernArgs,
- upconvertedQuery,
- ClientCursorParams::LockPolicy::kLockExternally,
- {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)},
- false // needsMerge always 'false' for find().
- });
+ {std::move(exec),
+ nss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->getWriteConcern(),
+ readConcernArgs,
+ upconvertedQuery,
+ ClientCursorParams::LockPolicy::kLockExternally,
+ {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)},
+ false, // needsMerge always 'false' for find().
+ opCtx->isExhaust()});
ccId = pinnedCursor.getCursor()->cursorid();
LOGV2_DEBUG(
diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp
index 124de6ead10..12234f50794 100644
--- a/src/mongo/dbtests/cursor_manager_test.cpp
+++ b/src/mongo/dbtests/cursor_manager_test.cpp
@@ -94,7 +94,8 @@ public:
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
};
}
@@ -148,7 +149,8 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursor) {
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
auto cursorId = cursorPin.getCursor()->cursorid();
@@ -178,7 +180,8 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursorMultiClient) {
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
auto cursorId = cursorPin.getCursor()->cursorid();
@@ -219,7 +222,8 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) {
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t()));
@@ -239,7 +243,8 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) {
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t::max()));
ASSERT_EQ(0UL, cursorManager->numCursors());
@@ -263,7 +268,8 @@ TEST_F(CursorManagerTest, InactivePinnedCursorShouldNotTimeout) {
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
// The pin is still in scope, so it should not time out.
@@ -291,7 +297,8 @@ TEST_F(CursorManagerTest, MarkedAsKilledCursorsShouldBeDeletedOnCursorPin) {
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
auto cursorId = cursorPin->cursorid();
@@ -328,7 +335,8 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsShouldTimeout) {
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
// A cursor will stay alive, but be marked as killed, if it is interrupted with a code other
@@ -364,7 +372,8 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) {
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
auto usedCursorId = cursorPin.getCursor()->cursorid();
cursorPin.release();
@@ -382,7 +391,8 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) {
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
// Advance the clock to simulate time passing.
@@ -423,7 +433,8 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing
BSONObj(),
ClientCursorParams::LockPolicy::kLocksInternally,
PrivilegeVector(),
- false // needsMerge
+ false, // needsMerge
+ false // isOpQueryExhaust
});
// Advance the clock to simulate time passing.
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp
index 619ccd1f594..dc536939f3b 100644
--- a/src/mongo/rpc/op_msg_integration_test.cpp
+++ b/src/mongo/rpc/op_msg_integration_test.cpp
@@ -555,6 +555,82 @@ TEST(OpMsg, MongosIgnoresExhaustForGetMore) {
ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 1));
}
+TEST(OpMsg, ExhaustWorksForAggCursor) {
+ std::string errMsg;
+ auto conn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, conn);
+
+ // Only test exhaust against a standalone.
+ if (conn->isReplicaSetMember() || conn->isMongos()) {
+ return;
+ }
+
+ NamespaceString nss("test", "coll");
+
+ conn->dropCollection(nss.toString());
+
+ // Insert 5 documents so that a cursor using a batchSize of 2 requires three batches to get all
+ // the results.
+ for (int i = 0; i < 5; i++) {
+ conn->insert(nss.toString(), BSON("_id" << i));
+ }
+
+ // Issue an agg request to open a cursor but return 0 documents. Specify a sort in order to
+ // guarantee their return order.
+ auto aggCmd = BSON("aggregate" << nss.coll() << "cursor" << BSON("batchSize" << 0) << "pipeline"
+ << BSON_ARRAY(BSON("$sort" << BSON("_id" << 1))));
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), aggCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ const long long cursorId = res["cursor"]["id"].numberLong();
+ ASSERT(res["cursor"]["firstBatch"].Array().empty());
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+
+ // Construct getMore request with exhaust flag. Set batch size so we will need multiple batches
+ // to exhaust the cursor.
+ int batchSize = 2;
+ GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none);
+ opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON());
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ auto assertNextBatch =
+ [](const Message& msg, CursorId expectedCursorId, std::vector<BSONObj> expectedBatch) {
+ auto cmdReply = OpMsg::parse(msg).body;
+ ASSERT_OK(getStatusFromCommandResult(cmdReply));
+ ASSERT_EQ(cmdReply["cursor"]["id"].numberLong(), expectedCursorId);
+ std::vector<BSONElement> nextBatch = cmdReply["cursor"]["nextBatch"].Array();
+ ASSERT_EQ(nextBatch.size(), expectedBatch.size());
+ auto it = expectedBatch.begin();
+ for (auto&& batchElt : nextBatch) {
+ ASSERT(it != expectedBatch.end());
+ ASSERT_BSONOBJ_EQ(batchElt.embeddedObject(), *it);
+ ++it;
+ }
+ };
+
+ // Run getMore to initiate the exhaust stream.
+ ASSERT(conn->call(request, reply));
+ auto lastRequestId = reply.header().getId();
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ assertNextBatch(reply, cursorId, {BSON("_id" << 0), BSON("_id" << 1)});
+
+ // Receive next exhaust batch.
+ ASSERT_OK(conn->recv(reply, lastRequestId));
+ lastRequestId = reply.header().getId();
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ assertNextBatch(reply, cursorId, {BSON("_id" << 2), BSON("_id" << 3)});
+
+ // Receive terminal batch.
+ ASSERT_OK(conn->recv(reply, lastRequestId));
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ assertNextBatch(reply, 0, {BSON("_id" << 4)});
+}
+
TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) {
std::string errMsg;
auto fixtureConn = std::unique_ptr<DBClientBase>(
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index ac67e983765..42a633829af 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -113,7 +113,7 @@ Message makeLegacyExhaustMessage(Message* m, const DbResponse& dbresponse) {
* 'synthetic' exhaust request. Returns an empty message if exhaust is not allowed.
*/
Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
- if (requestMsg.operation() == dbQuery) {
+ if (requestMsg.operation() == dbQuery || requestMsg.operation() == dbGetMore) {
return makeLegacyExhaustMessage(&requestMsg, *dbresponse);
}