summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2019-12-20 18:43:14 +0000
committerevergreen <evergreen@mongodb.com>2019-12-20 18:43:14 +0000
commit3fea6b339770dcdead06803b0c794553c25b94fb (patch)
tree09f84db37faa08da48957d967ba597fb7207ebaa /src/mongo
parentc5bd0178db8f5ea16f7df4e78a52fda56926d0b9 (diff)
downloadmongo-3fea6b339770dcdead06803b0c794553c25b94fb.tar.gz
SERVER-44517 Refactor exhaust cursors on top of isMaster code changes
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp6
-rw-r--r--src/mongo/db/dbmessage.h7
-rw-r--r--src/mongo/db/query/find.cpp27
-rw-r--r--src/mongo/db/query/find.h7
-rw-r--r--src/mongo/db/repl/replication_info.cpp3
-rw-r--r--src/mongo/db/service_entry_point_common.cpp17
-rw-r--r--src/mongo/dbtests/querytests.cpp4
-rw-r--r--src/mongo/rpc/op_msg_integration_test.cpp170
-rw-r--r--src/mongo/s/commands/strategy.cpp9
-rw-r--r--src/mongo/transport/service_state_machine.cpp97
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp130
11 files changed, 219 insertions, 258 deletions
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index d0129eaba5c..1ef70f0d2d0 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -634,6 +634,12 @@ public:
if (respondWithId) {
cursorFreer.dismiss();
+
+ if (opCtx->isExhaust()) {
+ // Indicate that an exhaust message should be generated and the previous BSONObj
+ // command parameters should be reused as the next BSONObj command parameters.
+ reply->setNextInvocation(boost::none);
+ }
}
}
diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h
index f66239637d9..d2db642f298 100644
--- a/src/mongo/db/dbmessage.h
+++ b/src/mongo/db/dbmessage.h
@@ -452,13 +452,6 @@ struct DbResponse {
// The next invocation for an exhaust command. If this is boost::none, the previous invocation
// should be reused for the next invocation.
boost::optional<BSONObj> nextInvocation;
-
- // TODO SERVER-44517: Remove 'exhaustNS' and 'exhaustCursorId'. Instead, GetMoreCmd::run()
- // should set 'shouldRunAgainForExhaust'.
- std::string exhaustNS; // Namespace of cursor if exhaust mode, else "".
- // Cursor ID when running on exhaust mode. Defaults to '0', indicating
- // that the cursor is exhausted.
- long long exhaustCursorId = 0;
};
/**
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 0d903bd3086..d118bcb7ed3 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -289,6 +289,8 @@ Message getMore(OperationContext* opCtx,
uassertStatusOK(statusWithCursorPin.getStatus());
auto cursorPin = std::move(statusWithCursorPin.getValue());
+ opCtx->setExhaust(cursorPin->queryOptions() & QueryOption_Exhaust);
+
if (cursorPin->lockPolicy() == ClientCursorParams::LockPolicy::kLocksInternally) {
if (!nss.isCollectionlessCursorNamespace()) {
AutoGetDb autoDb(opCtx, nss.db(), MODE_IS);
@@ -534,7 +536,8 @@ Message getMore(OperationContext* opCtx,
exec->detachFromOperationContext();
LOG(5) << "getMore saving client cursor ended with state " << PlanExecutor::statestr(state);
- *exhaust = cursorPin->queryOptions() & QueryOption_Exhaust;
+ // Set 'exhaust' if the client requested exhaust and the cursor is not exhausted.
+ *exhaust = opCtx->isExhaust();
// We assume that cursors created through a DBDirectClient are always used from their
// original OperationContext, so we do not need to move time to and from the cursor.
@@ -568,10 +571,10 @@ Message getMore(OperationContext* opCtx,
return Message(bb.release());
}
-std::string runQuery(OperationContext* opCtx,
- QueryMessage& q,
- const NamespaceString& nss,
- Message& result) {
+bool runQuery(OperationContext* opCtx,
+ QueryMessage& q,
+ const NamespaceString& nss,
+ Message& result) {
CurOp& curOp = *CurOp::get(opCtx);
curOp.ensureStarted();
@@ -610,6 +613,8 @@ std::string runQuery(OperationContext* opCtx,
Collection* const collection = ctx.getCollection();
const QueryRequest& qr = cq->getQueryRequest();
+ opCtx->setExhaust(qr.isExhaust());
+
{
// Allow the query to run on secondaries if the read preference permits it. If no read
// preference was specified, allow the query to run iff slaveOk has been set.
@@ -649,7 +654,7 @@ std::string runQuery(OperationContext* opCtx,
qr.setStartingFrom(0);
qr.setNReturned(1);
result.setData(bb.release());
- return "";
+ return false;
}
// Handle query option $maxTimeMS (not used with commands).
@@ -744,8 +749,9 @@ std::string runQuery(OperationContext* opCtx,
LOG(5) << "caching executor with cursorid " << ccId << " after returning " << numResults
<< " results";
- // TODO document
- if (qr.isExhaust()) {
+ // Set curOp.debug().exhaust if the client requested exhaust and the cursor is not
+ // exhausted.
+ if (opCtx->isExhaust()) {
curOp.debug().exhaust = true;
}
@@ -778,8 +784,9 @@ std::string runQuery(OperationContext* opCtx,
// Add the results from the query into the output buffer.
result.setData(bb.release());
- // curOp.debug().exhaust is set above.
- return curOp.debug().exhaust ? nss.ns() : "";
+ // curOp.debug().exhaust is set above if the client requested exhaust and the cursor is not
+ // exhausted.
+ return curOp.debug().exhaust;
}
} // namespace mongo
diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h
index 0a1e1a15078..2522ae338c5 100644
--- a/src/mongo/db/query/find.h
+++ b/src/mongo/db/query/find.h
@@ -98,11 +98,8 @@ Message getMore(OperationContext* opCtx,
bool* isCursorAuthorized);
/**
- * Run the query 'q' and place the result in 'result'.
+ * Run the query 'q' and place the result in 'result'. Returns true if in exhaust mode.
*/
-std::string runQuery(OperationContext* opCtx,
- QueryMessage& q,
- const NamespaceString& ns,
- Message& result);
+bool runQuery(OperationContext* opCtx, QueryMessage& q, const NamespaceString& ns, Message& result);
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index e3fe5de44dc..e058a7442aa 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -462,7 +462,8 @@ public:
if (clientTopologyVersion->getProcessId() == currentTopologyVersion->getProcessId() &&
clientTopologyVersion->getCounter() == currentTopologyVersion->getCounter()) {
- // Indicate that the previous invocation should be reused for the next invocation.
+ // Indicate that an exhaust message should be generated and the previous BSONObj
+ // command parameters should be reused as the next BSONObj command parameters.
replyBuilder->setNextInvocation(boost::none);
} else {
BSONObjBuilder nextInvocationBuilder;
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 58e3d1be0cf..4570d44bb74 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -1181,17 +1181,12 @@ DbResponse receivedCommands(OperationContext* opCtx,
DbResponse dbResponse;
- dbResponse.shouldRunAgainForExhaust = replyBuilder->shouldRunAgainForExhaust();
- dbResponse.nextInvocation = replyBuilder->getNextInvocation();
-
- // TODO SERVER-44517: This block can be removed once 'exhaustNS' and 'exhaustCursorId' are
- // removed from DbResponse.
if (OpMsg::isFlagSet(message, OpMsg::kExhaustSupported)) {
auto responseObj = replyBuilder->getBodyBuilder().asTempObj();
- auto cursorObj = responseObj.getObjectField("cursor");
- if (responseObj.getField("ok").trueValue() && !cursorObj.isEmpty()) {
- dbResponse.exhaustNS = cursorObj.getField("ns").String();
- dbResponse.exhaustCursorId = cursorObj.getField("id").numberLong();
+
+ if (responseObj.getField("ok").trueValue()) {
+ dbResponse.shouldRunAgainForExhaust = replyBuilder->shouldRunAgainForExhaust();
+ dbResponse.nextInvocation = replyBuilder->getNextInvocation();
}
}
@@ -1222,7 +1217,7 @@ DbResponse receivedQuery(OperationContext* opCtx,
audit::logQueryAuthzCheck(client, nss, q.query, status.code());
uassertStatusOK(status);
- dbResponse.exhaustNS = runQuery(opCtx, q, nss, dbResponse.response);
+ dbResponse.shouldRunAgainForExhaust = runQuery(opCtx, q, nss, dbResponse.response);
} catch (const AssertionException& e) {
behaviors.handleException(e, opCtx);
@@ -1384,7 +1379,7 @@ DbResponse receivedGetMore(OperationContext* opCtx,
if (exhaust) {
curop.debug().exhaust = true;
- dbresponse.exhaustNS = ns;
+ dbresponse.shouldRunAgainForExhaust = true;
}
return dbresponse;
diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp
index c7936436f49..6e6fa668950 100644
--- a/src/mongo/dbtests/querytests.cpp
+++ b/src/mongo/dbtests/querytests.cpp
@@ -1918,9 +1918,7 @@ public:
DbMessage dbMessage(message);
QueryMessage queryMessage(dbMessage);
Message result;
- string exhaust = runQuery(&_opCtx, queryMessage, NamespaceString(ns()), result);
- ASSERT(exhaust.size());
- ASSERT_EQUALS(string(ns()), exhaust);
+ ASSERT_TRUE(runQuery(&_opCtx, queryMessage, NamespaceString(ns()), result));
}
};
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp
index 4edb0fab6f3..2009a609876 100644
--- a/src/mongo/rpc/op_msg_integration_test.cpp
+++ b/src/mongo/rpc/op_msg_integration_test.cpp
@@ -295,7 +295,7 @@ void exhaustGetMoreTest(bool enableChecksum) {
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
uassert(ErrorCodes::SocketException, errMsg, conn);
- // Only test exhaust against a single server.
+ // Only test exhaust against a standalone.
if (conn->isReplicaSetMember() || conn->isMongos()) {
return;
}
@@ -312,10 +312,11 @@ void exhaustGetMoreTest(bool enableChecksum) {
// Insert a few documents.
for (int i = 0; i < 5; i++) {
- conn->insert(nss.toString(), BSON("_id" << i), 0);
+ conn->insert(nss.toString(), BSON("_id" << i));
}
- // Issue a find request to open a cursor but return 0 documents.
+ // Issue a find request to open a cursor but return 0 documents. Specify a sort in order to
+ // guarantee their return order.
auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0 << "sort" << BSON("_id" << 1));
auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd);
auto request = opMsgRequest.serialize();
@@ -383,7 +384,144 @@ TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectlyWithChecksum) {
exhaustGetMoreTest(true);
}
-void exhaustIsMasterTest(bool enableChecksum) {
+TEST(OpMsg, FindIgnoresExhaust) {
+ std::string errMsg;
+ auto conn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, conn);
+
+ // Only test exhaust against a standalone.
+ if (conn->isReplicaSetMember() || conn->isMongos()) {
+ return;
+ }
+
+ NamespaceString nss("test", "coll");
+
+ conn->dropCollection(nss.toString());
+
+ // Insert a few documents.
+ for (int i = 0; i < 5; i++) {
+ conn->insert(nss.toString(), BSON("_id" << i));
+ }
+
+ // Issue a find request with exhaust flag. Returns 0 documents.
+ auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd);
+ auto request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT(res["cursor"]["firstBatch"].Array().empty());
+ // The response should not have set moreToCome. We only expect getMore response to set
+ // 'moreToCome'.
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+}
+
+TEST(OpMsg, ServerDoesNotSetMoreToComeOnErrorInGetMore) {
+ std::string errMsg;
+ auto conn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, conn);
+
+ // Only test exhaust against a standalone.
+ if (conn->isReplicaSetMember() || conn->isMongos()) {
+ return;
+ }
+
+ NamespaceString nss("test", "coll");
+
+ conn->dropCollection(nss.toString());
+
+ // Insert a few documents.
+ for (int i = 0; i < 5; i++) {
+ conn->insert(nss.toString(), BSON("_id" << i));
+ }
+
+ // Issue a find request to open a cursor but return 0 documents.
+ auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ const long long cursorId = res["cursor"]["id"].numberLong();
+ ASSERT(res["cursor"]["firstBatch"].Array().empty());
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+
+ // Drop the collection, so that the next getMore will error.
+ conn->dropCollection(nss.toString());
+
+ // Construct getMore request with exhaust flag.
+ int batchSize = 2;
+ GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none);
+ opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON());
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run getMore. This should not start an exhaust stream.
+ ASSERT(conn->call(request, reply));
+ // The response should not have set moreToCome.
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_NOT_OK(getStatusFromCommandResult(res));
+}
+
+TEST(OpMsg, MongosIgnoresExhaustForGetMore) {
+ std::string errMsg;
+ auto conn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, conn);
+
+ if (!conn->isMongos()) {
+ return;
+ }
+
+ NamespaceString nss("test", "coll");
+
+ conn->dropCollection(nss.toString());
+
+ // Insert a few documents.
+ for (int i = 0; i < 5; i++) {
+ conn->insert(nss.toString(), BSON("_id" << i));
+ }
+
+ // Issue a find request to open a cursor but return 0 documents. Specify a sort in order to
+ // guarantee their return order.
+ auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0 << "sort" << BSON("_id" << 1));
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ const long long cursorId = res["cursor"]["id"].numberLong();
+ ASSERT(res["cursor"]["firstBatch"].Array().empty());
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+
+ // Construct getMore request with exhaust flag.
+ int batchSize = 2;
+ GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none);
+ opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON());
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run getMore. This should not start an exhaust stream.
+ ASSERT(conn->call(request, reply));
+ // The response should not have set moreToCome.
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ ASSERT_EQ(res["cursor"]["id"].numberLong(), cursorId);
+ std::vector<BSONElement> nextBatch = res["cursor"]["nextBatch"].Array();
+ ASSERT_EQ(nextBatch.size(), 2U);
+ ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 0));
+ ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 1));
+}
+
+TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) {
std::string errMsg;
auto fixtureConn = std::unique_ptr<DBClientBase>(
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
@@ -399,12 +537,6 @@ void exhaustIsMasterTest(bool enableChecksum) {
DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
ASSERT(conn);
- if (!enableChecksum) {
- disableClientChecksum();
- }
-
- ON_BLOCK_EXIT([&] { enableClientChecksum(); });
-
auto tickSource = getGlobalServiceContext()->getTickSource();
// Issue an isMaster command without a topology version.
@@ -418,8 +550,6 @@ void exhaustIsMasterTest(bool enableChecksum) {
ASSERT_OK(getStatusFromCommandResult(res));
auto topologyVersion = res["topologyVersion"].Obj().getOwned();
ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
- // Reply has checksum if and only if the request did.
- ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum);
// Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust.
isMasterCmd =
@@ -436,7 +566,6 @@ void exhaustIsMasterTest(bool enableChecksum) {
ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand),
Milliseconds(50));
ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
- ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum);
res = OpMsg::parse(reply).body;
ASSERT_OK(getStatusFromCommandResult(res));
auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
@@ -450,7 +579,6 @@ void exhaustIsMasterTest(bool enableChecksum) {
ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse),
Milliseconds(50));
ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
- ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum);
res = OpMsg::parse(reply).body;
ASSERT_OK(getStatusFromCommandResult(res));
nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
@@ -459,16 +587,6 @@ void exhaustIsMasterTest(bool enableChecksum) {
// The exhaust stream would continue indefinitely.
}
-TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) {
- exhaustIsMasterTest(false);
-}
-
-// TODO SERVER-44517: The checksum logic will be unified for exhaust commands, so we don't need to
-// test checksum for both exhaust isMaster and exhaust getMore.
-TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectlyWithChecksum) {
- exhaustIsMasterTest(true);
-}
-
TEST(OpMsg, ServerHandlesExhaustIsMasterWithTopologyChange) {
std::string errMsg;
auto fixtureConn = std::unique_ptr<DBClientBase>(
@@ -576,7 +694,7 @@ TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) {
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
uassert(ErrorCodes::SocketException, errMsg, conn);
- // Only test exhaust against a single server.
+ // Only test exhaust against a standalone.
if (conn->isReplicaSetMember() || conn->isMongos()) {
return;
}
@@ -588,7 +706,7 @@ TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) {
unittest::log() << "Inserting " << nDocs << " documents.";
for (int i = 0; i < nDocs; i++) {
auto doc = BSON("_id" << i);
- conn->insert(nss.toString(), doc, 0);
+ conn->insert(nss.toString(), doc);
}
ASSERT_EQ(conn->count(nss), size_t(nDocs));
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index c80ed6e0956..61f692f315a 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -873,14 +873,11 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
}
DbResponse dbResponse;
- dbResponse.shouldRunAgainForExhaust = reply->shouldRunAgainForExhaust();
- dbResponse.nextInvocation = reply->getNextInvocation();
if (OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)) {
auto responseObj = reply->getBodyBuilder().asTempObj();
- auto cursorObj = responseObj.getObjectField("cursor");
- if (responseObj.getField("ok").trueValue() && !cursorObj.isEmpty()) {
- dbResponse.exhaustNS = cursorObj.getField("ns").String();
- dbResponse.exhaustCursorId = cursorObj.getField("id").numberLong();
+ if (responseObj.getField("ok").trueValue()) {
+ dbResponse.shouldRunAgainForExhaust = reply->shouldRunAgainForExhaust();
+ dbResponse.nextInvocation = reply->getNextInvocation();
}
}
dbResponse.response = reply->done();
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 68ff7e8c1d9..4ca997d96ed 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -67,10 +67,18 @@ Message makeLegacyExhaustMessage(Message* m, const DbResponse& dbresponse) {
// OP_QUERY responses are always of type OP_REPLY.
invariant(dbresponse.response.operation() == opReply);
- if (dbresponse.exhaustNS.empty()) {
+ if (!dbresponse.shouldRunAgainForExhaust) {
return Message();
}
+ // Legacy find operations via the OP_QUERY/OP_GET_MORE network protocol never provide the next
+ // invocation for exhaust.
+ invariant(!dbresponse.nextInvocation);
+
+ DbMessage dbmsg(*m);
+ invariant(dbmsg.messageShouldHaveNs());
+ const char* ns = dbmsg.getns();
+
MsgData::View header = dbresponse.response.header();
QueryResult::View qr = header.view2ptr();
long long cursorid = qr.getCursorId();
@@ -86,7 +94,7 @@ Message makeLegacyExhaustMessage(Message* m, const DbResponse& dbresponse) {
b.appendNum(header.getResponseToMsgId()); // in response to
b.appendNum(static_cast<int>(dbGetMore)); // opCode is OP_GET_MORE
b.appendNum(static_cast<int>(0)); // Must be ZERO (reserved)
- b.appendStr(dbresponse.exhaustNS); // Namespace
+ b.appendStr(StringData(ns)); // Namespace
b.appendNum(static_cast<int>(0)); // ntoreturn
b.appendNum(cursorid); // cursor id from the OP_REPLY
@@ -110,54 +118,31 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
return Message();
}
- const bool checksumPresent = OpMsg::isFlagSet(requestMsg, OpMsg::kChecksumPresent);
- if (dbresponse->shouldRunAgainForExhaust) {
- Message exhaustMessage;
-
- if (auto nextInvocation = dbresponse->nextInvocation) {
- // The command provided a new BSONObj for the next invocation.
- OpMsgBuilder builder;
- builder.setBody(*nextInvocation);
- exhaustMessage = builder.finish();
- } else {
- // Reuse the previous invocation for the next invocation.
- OpMsg::removeChecksum(&requestMsg);
- exhaustMessage = requestMsg;
- }
-
- // The id of the response is used as the request id of this 'synthetic' request. Re-checksum
- // if needed.
- exhaustMessage.header().setId(dbresponse->response.header().getId());
- exhaustMessage.header().setResponseToMsgId(
- dbresponse->response.header().getResponseToMsgId());
- OpMsg::setFlag(&exhaustMessage, OpMsg::kExhaustSupported);
- if (checksumPresent) {
- OpMsg::appendChecksum(&exhaustMessage);
- }
-
- OpMsg::removeChecksum(&dbresponse->response);
- // Indicate that the response is part of an exhaust stream. Re-checksum if needed.
- OpMsg::setFlag(&dbresponse->response, OpMsg::kMoreToCome);
- if (checksumPresent) {
- OpMsg::appendChecksum(&dbresponse->response);
- }
-
- return exhaustMessage;
+ if (!dbresponse->shouldRunAgainForExhaust) {
+ return Message();
}
- // TODO SERVER-44517: Everything below this line should go away, and we should return Message(),
- // since the command did not set a next invocation.
+ const bool checksumPresent = OpMsg::isFlagSet(requestMsg, OpMsg::kChecksumPresent);
+ Message exhaustMessage;
- // Only support exhaust for 'getMore' commands.
- auto request = OpMsgRequest::parse(requestMsg);
- if (request.getCommandName() != "getMore"_sd) {
- return Message();
+ if (auto nextInvocation = dbresponse->nextInvocation) {
+ // The command provided a new BSONObj for the next invocation.
+ OpMsgBuilder builder;
+ builder.setBody(*nextInvocation);
+ exhaustMessage = builder.finish();
+ } else {
+ // Reuse the previous invocation for the next invocation.
+ OpMsg::removeChecksum(&requestMsg);
+ exhaustMessage = requestMsg;
}
- // A returned cursor id of '0' indicates that the cursor is exhausted and so the exhaust stream
- // should be terminated. Also make sure the cursor namespace is valid.
- if (dbresponse->exhaustCursorId == 0 || dbresponse->exhaustNS.empty()) {
- return Message();
+ // The id of the response is used as the request id of this 'synthetic' request. Re-checksum
+ // if needed.
+ exhaustMessage.header().setId(dbresponse->response.header().getId());
+ exhaustMessage.header().setResponseToMsgId(dbresponse->response.header().getResponseToMsgId());
+ OpMsg::setFlag(&exhaustMessage, OpMsg::kExhaustSupported);
+ if (checksumPresent) {
+ OpMsg::appendChecksum(&exhaustMessage);
}
OpMsg::removeChecksum(&dbresponse->response);
@@ -167,16 +152,7 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
OpMsg::appendChecksum(&dbresponse->response);
}
- // Return an augmented form of the initial request, which is to be used as the next request to
- // be processed by the database. The id of the response is used as the request id of this
- // 'synthetic' request. Re-checksum if needed.
- OpMsg::removeChecksum(&requestMsg);
- requestMsg.header().setId(dbresponse->response.header().getId());
- requestMsg.header().setResponseToMsgId(dbresponse->response.header().getResponseToMsgId());
- if (checksumPresent) {
- OpMsg::appendChecksum(&requestMsg);
- }
- return requestMsg;
+ return exhaustMessage;
}
} // namespace
@@ -501,12 +477,11 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) {
#endif
}
- // If the incoming message has the exhaust flag set and is a 'getMore' command, then we
- // bypass the normal RPC behavior. We will sink the response to the network, but we also
- // synthesize a new 'getMore' request, as if we sourced a new message from the network. This
- // new request is sent to the database once again to be processed. This cycle repeats as
- // long as the associated cursor is not exhausted. Once it is exhausted, we will send a
- // final response, terminating the exhaust stream.
+ // If the incoming message has the exhaust flag set, then we bypass the normal RPC behavior.
+ // We will sink the response to the network, but we also synthesize a new request, as if we
+ // sourced a new message from the network. This new request is sent to the database once
+ // again to be processed. This cycle repeats as long as the command indicates the exhaust
+ // stream should continue.
_inMessage = makeExhaustMessage(_inMessage, &dbresponse);
_inExhaust = !_inMessage.empty();
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index 2cc54156c6b..caf064f91c6 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -93,10 +93,8 @@ public:
if (OpMsg::isFlagSet(request, OpMsg::kExhaustSupported)) {
auto reply = OpMsg::parse(res);
auto cursorObj = reply.body.getObjectField("cursor");
- if (reply.body["ok"].trueValue() && !cursorObj.isEmpty()) {
- dbResponse.exhaustCursorId = cursorObj.getField("id").numberLong();
- dbResponse.exhaustNS = cursorObj.getField("ns").String();
- }
+ dbResponse.shouldRunAgainForExhaust = reply.body["ok"].trueValue() &&
+ !cursorObj.isEmpty() && (cursorObj.getField("id").numberLong() != 0);
}
dbResponse.response = res;
@@ -490,130 +488,6 @@ TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaust) {
ASSERT_EQ(firstResponseId, msg.header().getResponseToMsgId());
}
-TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaustAndEmptyResponseNamespace) {
- // Construct a 'getMore' OP_MSG request with the exhaust flag set.
- const int32_t initRequestId = 1;
- const long long cursorId = 42;
- const std::string nss = "test.coll";
- Message getMoreWithExhaust = getMoreRequestWithExhaust(nss, cursorId, initRequestId);
-
- // Construct a 'getMore' response with an empty namespace.
- BSONObj getMoreTerminalResBody = BSON("ok" << 1 << "cursor"
- << BSON("id" << 42 << "ns"
- << ""
- << "nextBatch" << BSONArray()));
- Message getMoreTerminalRes = buildOpMsg(getMoreTerminalResBody);
-
- // Let the 'getMore' request be sourced from the network, processed in the database, and
- // and the response sunk to the TransportLayer.
- runSourceAndSinkTest(
- _tl, _sep, getMoreWithExhaust, getMoreTerminalRes, State::Process, State::Source);
-
- // Check the last sunk message.
- auto msg = _tl->getLastSunk();
- ASSERT(!msg.empty());
- auto reply = OpMsg::parse(msg);
- ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome));
- ASSERT_BSONOBJ_EQ(getMoreTerminalResBody, reply.body);
-}
-
-TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaustAndEmptyCursorObjectInResponse) {
- // Construct a 'getMore' OP_MSG request with the exhaust flag set.
- const int32_t initRequestId = 1;
- const long long cursorId = 42;
- const std::string nss = "test.coll";
- Message getMoreWithExhaust = getMoreRequestWithExhaust(nss, cursorId, initRequestId);
-
- // Construct a 'getMore' response with an empty cursor object.
- BSONObj getMoreTerminalResBody = BSON("ok" << 1 << "cursor" << BSONObj());
- Message getMoreTerminalRes = buildOpMsg(getMoreTerminalResBody);
-
- // Let the 'getMore' request be sourced from the network, processed in the database, and
- // and the response sunk to the TransportLayer.
- runSourceAndSinkTest(
- _tl, _sep, getMoreWithExhaust, getMoreTerminalRes, State::Process, State::Source);
-
- // Check the last sunk message.
- auto msg = _tl->getLastSunk();
- ASSERT(!msg.empty());
- auto reply = OpMsg::parse(msg);
- ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome));
- ASSERT_BSONOBJ_EQ(getMoreTerminalResBody, reply.body);
-}
-
-TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaustAndNoCursorFieldInResponse) {
- // Construct a 'getMore' OP_MSG request with the exhaust flag set.
- const int32_t initRequestId = 1;
- const long long cursorId = 42;
- const std::string nss = "test.coll";
- Message getMoreWithExhaust = getMoreRequestWithExhaust(nss, cursorId, initRequestId);
-
- // Construct a 'getMore' response with no 'cursor' field.
- BSONObj getMoreTerminalResBody = BSON("ok" << 1);
- Message getMoreTerminalRes = buildOpMsg(getMoreTerminalResBody);
-
- // Let the 'getMore' request be sourced from the network, processed in the database, and
- // and the response sunk to the TransportLayer.
- runSourceAndSinkTest(
- _tl, _sep, getMoreWithExhaust, getMoreTerminalRes, State::Process, State::Source);
-
- // Check the last sunk message.
- auto msg = _tl->getLastSunk();
- ASSERT(!msg.empty());
- auto reply = OpMsg::parse(msg);
- ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome));
- ASSERT_BSONOBJ_EQ(getMoreTerminalResBody, reply.body);
-}
-
-TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaustAndNonOKResponse) {
- // Construct a 'getMore' OP_MSG request with the exhaust flag set.
- const int32_t initRequestId = 1;
- const long long cursorId = 42;
- const std::string nss = "test.coll";
- Message getMoreWithExhaust = getMoreRequestWithExhaust(nss, cursorId, initRequestId);
-
- // Construct a 'getMore' response with a non-ok response.
- BSONObj getMoreTerminalResBody = BSON(
- "ok" << 0 << "cursor" << BSON("id" << 42 << "ns" << nss << "nextBatch" << BSONArray()));
- Message getMoreTerminalRes = buildOpMsg(getMoreTerminalResBody);
-
- // Let the 'getMore' request be sourced from the network, processed in the database, and
- // and the response sunk to the TransportLayer.
- runSourceAndSinkTest(
- _tl, _sep, getMoreWithExhaust, getMoreTerminalRes, State::Process, State::Source);
-
- // Check the last sunk message.
- auto msg = _tl->getLastSunk();
- ASSERT(!msg.empty());
- auto reply = OpMsg::parse(msg);
- ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome));
- ASSERT_BSONOBJ_EQ(getMoreTerminalResBody, reply.body);
-}
-
-
-TEST_F(ServiceStateMachineFixture, TestExhaustOnlySupportedForGetMoreCommand) {
- // Construct a 'find' OP_MSG request with the exhaust flag set. We should ignore exhaust flags
- // for non 'getMore' commands.
- const std::string nss = "test.coll";
- Message findWithExhaust = buildOpMsg(BSON("find" << nss));
- OpMsg::setFlag(&findWithExhaust, OpMsg::kExhaustSupported);
-
- // Construct an OK response.
- Message findRes = buildOpMsg(BSON(
- "ok" << 1 << "cursor" << BSON("id" << 42 << "ns" << nss << "firstBatch" << BSONArray())));
-
- // Let the 'find' request be sourced from the network, processed in the database, and
- // and the response sunk to the TransportLayer.
- runSourceAndSinkTest(_tl, _sep, findWithExhaust, findRes, State::Process, State::Source);
-
- // Check the last sunk message.
- auto msg = _tl->getLastSunk();
- ASSERT(!msg.empty());
- auto reply = OpMsg::parse(msg);
- ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome));
- ASSERT_EQ(1, reply.body.getIntField("ok"));
-}
-
TEST_F(ServiceStateMachineFixture, TestThrowHandling) {
_sep->setUassertInHandler();