diff options
author | Tess Avitabile <tess.avitabile@mongodb.com> | 2019-12-20 18:43:14 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-20 18:43:14 +0000 |
commit | 3fea6b339770dcdead06803b0c794553c25b94fb (patch) | |
tree | 09f84db37faa08da48957d967ba597fb7207ebaa /src/mongo | |
parent | c5bd0178db8f5ea16f7df4e78a52fda56926d0b9 (diff) | |
download | mongo-3fea6b339770dcdead06803b0c794553c25b94fb.tar.gz |
SERVER-44517 Refactor exhaust cursors on top of isMaster code changes
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/dbmessage.h | 7 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/query/find.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_info.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 17 | ||||
-rw-r--r-- | src/mongo/dbtests/querytests.cpp | 4 | ||||
-rw-r--r-- | src/mongo/rpc/op_msg_integration_test.cpp | 170 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 9 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 97 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine_test.cpp | 130 |
11 files changed, 219 insertions, 258 deletions
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index d0129eaba5c..1ef70f0d2d0 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -634,6 +634,12 @@ public: if (respondWithId) { cursorFreer.dismiss(); + + if (opCtx->isExhaust()) { + // Indicate that an exhaust message should be generated and the previous BSONObj + // command parameters should be reused as the next BSONObj command parameters. + reply->setNextInvocation(boost::none); + } } } diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index f66239637d9..d2db642f298 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -452,13 +452,6 @@ struct DbResponse { // The next invocation for an exhaust command. If this is boost::none, the previous invocation // should be reused for the next invocation. boost::optional<BSONObj> nextInvocation; - - // TODO SERVER-44517: Remove 'exhaustNS' and 'exhaustCursorId'. Instead, GetMoreCmd::run() - // should set 'shouldRunAgainForExhaust'. - std::string exhaustNS; // Namespace of cursor if exhaust mode, else "". - // Cursor ID when running on exhaust mode. Defaults to '0', indicating - // that the cursor is exhausted. - long long exhaustCursorId = 0; }; /** diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 0d903bd3086..d118bcb7ed3 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -289,6 +289,8 @@ Message getMore(OperationContext* opCtx, uassertStatusOK(statusWithCursorPin.getStatus()); auto cursorPin = std::move(statusWithCursorPin.getValue()); + opCtx->setExhaust(cursorPin->queryOptions() & QueryOption_Exhaust); + if (cursorPin->lockPolicy() == ClientCursorParams::LockPolicy::kLocksInternally) { if (!nss.isCollectionlessCursorNamespace()) { AutoGetDb autoDb(opCtx, nss.db(), MODE_IS); @@ -534,7 +536,8 @@ Message getMore(OperationContext* opCtx, exec->detachFromOperationContext(); LOG(5) << "getMore saving client cursor ended with state " << PlanExecutor::statestr(state); - *exhaust = cursorPin->queryOptions() & QueryOption_Exhaust; + // Set 'exhaust' if the client requested exhaust and the cursor is not exhausted. + *exhaust = opCtx->isExhaust(); // We assume that cursors created through a DBDirectClient are always used from their // original OperationContext, so we do not need to move time to and from the cursor. @@ -568,10 +571,10 @@ Message getMore(OperationContext* opCtx, return Message(bb.release()); } -std::string runQuery(OperationContext* opCtx, - QueryMessage& q, - const NamespaceString& nss, - Message& result) { +bool runQuery(OperationContext* opCtx, + QueryMessage& q, + const NamespaceString& nss, + Message& result) { CurOp& curOp = *CurOp::get(opCtx); curOp.ensureStarted(); @@ -610,6 +613,8 @@ std::string runQuery(OperationContext* opCtx, Collection* const collection = ctx.getCollection(); const QueryRequest& qr = cq->getQueryRequest(); + opCtx->setExhaust(qr.isExhaust()); + { // Allow the query to run on secondaries if the read preference permits it. If no read // preference was specified, allow the query to run iff slaveOk has been set. @@ -649,7 +654,7 @@ std::string runQuery(OperationContext* opCtx, qr.setStartingFrom(0); qr.setNReturned(1); result.setData(bb.release()); - return ""; + return false; } // Handle query option $maxTimeMS (not used with commands). @@ -744,8 +749,9 @@ std::string runQuery(OperationContext* opCtx, LOG(5) << "caching executor with cursorid " << ccId << " after returning " << numResults << " results"; - // TODO document - if (qr.isExhaust()) { + // Set curOp.debug().exhaust if the client requested exhaust and the cursor is not + // exhausted. + if (opCtx->isExhaust()) { curOp.debug().exhaust = true; } @@ -778,8 +784,9 @@ std::string runQuery(OperationContext* opCtx, // Add the results from the query into the output buffer. result.setData(bb.release()); - // curOp.debug().exhaust is set above. - return curOp.debug().exhaust ? nss.ns() : ""; + // curOp.debug().exhaust is set above if the client requested exhaust and the cursor is not + // exhausted. + return curOp.debug().exhaust; } } // namespace mongo diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h index 0a1e1a15078..2522ae338c5 100644 --- a/src/mongo/db/query/find.h +++ b/src/mongo/db/query/find.h @@ -98,11 +98,8 @@ Message getMore(OperationContext* opCtx, bool* isCursorAuthorized); /** - * Run the query 'q' and place the result in 'result'. + * Run the query 'q' and place the result in 'result'. Returns true if in exhaust mode. */ -std::string runQuery(OperationContext* opCtx, - QueryMessage& q, - const NamespaceString& ns, - Message& result); +bool runQuery(OperationContext* opCtx, QueryMessage& q, const NamespaceString& ns, Message& result); } // namespace mongo diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index e3fe5de44dc..e058a7442aa 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -462,7 +462,8 @@ public: if (clientTopologyVersion->getProcessId() == currentTopologyVersion->getProcessId() && clientTopologyVersion->getCounter() == currentTopologyVersion->getCounter()) { - // Indicate that the previous invocation should be reused for the next invocation. + // Indicate that an exhaust message should be generated and the previous BSONObj + // command parameters should be reused as the next BSONObj command parameters. replyBuilder->setNextInvocation(boost::none); } else { BSONObjBuilder nextInvocationBuilder; diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 58e3d1be0cf..4570d44bb74 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -1181,17 +1181,12 @@ DbResponse receivedCommands(OperationContext* opCtx, DbResponse dbResponse; - dbResponse.shouldRunAgainForExhaust = replyBuilder->shouldRunAgainForExhaust(); - dbResponse.nextInvocation = replyBuilder->getNextInvocation(); - - // TODO SERVER-44517: This block can be removed once 'exhaustNS' and 'exhaustCursorId' are - // removed from DbResponse. if (OpMsg::isFlagSet(message, OpMsg::kExhaustSupported)) { auto responseObj = replyBuilder->getBodyBuilder().asTempObj(); - auto cursorObj = responseObj.getObjectField("cursor"); - if (responseObj.getField("ok").trueValue() && !cursorObj.isEmpty()) { - dbResponse.exhaustNS = cursorObj.getField("ns").String(); - dbResponse.exhaustCursorId = cursorObj.getField("id").numberLong(); + + if (responseObj.getField("ok").trueValue()) { + dbResponse.shouldRunAgainForExhaust = replyBuilder->shouldRunAgainForExhaust(); + dbResponse.nextInvocation = replyBuilder->getNextInvocation(); } } @@ -1222,7 +1217,7 @@ DbResponse receivedQuery(OperationContext* opCtx, audit::logQueryAuthzCheck(client, nss, q.query, status.code()); uassertStatusOK(status); - dbResponse.exhaustNS = runQuery(opCtx, q, nss, dbResponse.response); + dbResponse.shouldRunAgainForExhaust = runQuery(opCtx, q, nss, dbResponse.response); } catch (const AssertionException& e) { behaviors.handleException(e, opCtx); @@ -1384,7 +1379,7 @@ DbResponse receivedGetMore(OperationContext* opCtx, if (exhaust) { curop.debug().exhaust = true; - dbresponse.exhaustNS = ns; + dbresponse.shouldRunAgainForExhaust = true; } return dbresponse; diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index c7936436f49..6e6fa668950 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -1918,9 +1918,7 @@ public: DbMessage dbMessage(message); QueryMessage queryMessage(dbMessage); Message result; - string exhaust = runQuery(&_opCtx, queryMessage, NamespaceString(ns()), result); - ASSERT(exhaust.size()); - ASSERT_EQUALS(string(ns()), exhaust); + ASSERT_TRUE(runQuery(&_opCtx, queryMessage, NamespaceString(ns()), result)); } }; diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index 4edb0fab6f3..2009a609876 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -295,7 +295,7 @@ void exhaustGetMoreTest(bool enableChecksum) { unittest::getFixtureConnectionString().connect("integration_test", errMsg)); uassert(ErrorCodes::SocketException, errMsg, conn); - // Only test exhaust against a single server. + // Only test exhaust against a standalone. if (conn->isReplicaSetMember() || conn->isMongos()) { return; } @@ -312,10 +312,11 @@ void exhaustGetMoreTest(bool enableChecksum) { // Insert a few documents. for (int i = 0; i < 5; i++) { - conn->insert(nss.toString(), BSON("_id" << i), 0); + conn->insert(nss.toString(), BSON("_id" << i)); } - // Issue a find request to open a cursor but return 0 documents. + // Issue a find request to open a cursor but return 0 documents. Specify a sort in order to + // guarantee their return order. auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0 << "sort" << BSON("_id" << 1)); auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd); auto request = opMsgRequest.serialize(); @@ -383,7 +384,144 @@ TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectlyWithChecksum) { exhaustGetMoreTest(true); } -void exhaustIsMasterTest(bool enableChecksum) { +TEST(OpMsg, FindIgnoresExhaust) { + std::string errMsg; + auto conn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, conn); + + // Only test exhaust against a standalone. + if (conn->isReplicaSetMember() || conn->isMongos()) { + return; + } + + NamespaceString nss("test", "coll"); + + conn->dropCollection(nss.toString()); + + // Insert a few documents. + for (int i = 0; i < 5; i++) { + conn->insert(nss.toString(), BSON("_id" << i)); + } + + // Issue a find request with exhaust flag. Returns 0 documents. + auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0); + auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd); + auto request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT(res["cursor"]["firstBatch"].Array().empty()); + // The response should not have set moreToCome. We only expect getMore response to set + // 'moreToCome'. + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); +} + +TEST(OpMsg, ServerDoesNotSetMoreToComeOnErrorInGetMore) { + std::string errMsg; + auto conn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, conn); + + // Only test exhaust against a standalone. + if (conn->isReplicaSetMember() || conn->isMongos()) { + return; + } + + NamespaceString nss("test", "coll"); + + conn->dropCollection(nss.toString()); + + // Insert a few documents. + for (int i = 0; i < 5; i++) { + conn->insert(nss.toString(), BSON("_id" << i)); + } + + // Issue a find request to open a cursor but return 0 documents. + auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0); + auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + const long long cursorId = res["cursor"]["id"].numberLong(); + ASSERT(res["cursor"]["firstBatch"].Array().empty()); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + // Drop the collection, so that the next getMore will error. + conn->dropCollection(nss.toString()); + + // Construct getMore request with exhaust flag. + int batchSize = 2; + GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none); + opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON()); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run getMore. This should not start an exhaust stream. + ASSERT(conn->call(request, reply)); + // The response should not have set moreToCome. + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_NOT_OK(getStatusFromCommandResult(res)); +} + +TEST(OpMsg, MongosIgnoresExhaustForGetMore) { + std::string errMsg; + auto conn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, conn); + + if (!conn->isMongos()) { + return; + } + + NamespaceString nss("test", "coll"); + + conn->dropCollection(nss.toString()); + + // Insert a few documents. + for (int i = 0; i < 5; i++) { + conn->insert(nss.toString(), BSON("_id" << i)); + } + + // Issue a find request to open a cursor but return 0 documents. Specify a sort in order to + // guarantee their return order. + auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0 << "sort" << BSON("_id" << 1)); + auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + const long long cursorId = res["cursor"]["id"].numberLong(); + ASSERT(res["cursor"]["firstBatch"].Array().empty()); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + // Construct getMore request with exhaust flag. + int batchSize = 2; + GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none); + opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON()); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run getMore. This should not start an exhaust stream. + ASSERT(conn->call(request, reply)); + // The response should not have set moreToCome. + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + ASSERT_EQ(res["cursor"]["id"].numberLong(), cursorId); + std::vector<BSONElement> nextBatch = res["cursor"]["nextBatch"].Array(); + ASSERT_EQ(nextBatch.size(), 2U); + ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 0)); + ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 1)); +} + +TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) { std::string errMsg; auto fixtureConn = std::unique_ptr<DBClientBase>( unittest::getFixtureConnectionString().connect("integration_test", errMsg)); @@ -399,12 +537,6 @@ void exhaustIsMasterTest(bool enableChecksum) { DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); ASSERT(conn); - if (!enableChecksum) { - disableClientChecksum(); - } - - ON_BLOCK_EXIT([&] { enableClientChecksum(); }); - auto tickSource = getGlobalServiceContext()->getTickSource(); // Issue an isMaster command without a topology version. @@ -418,8 +550,6 @@ void exhaustIsMasterTest(bool enableChecksum) { ASSERT_OK(getStatusFromCommandResult(res)); auto topologyVersion = res["topologyVersion"].Obj().getOwned(); ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); - // Reply has checksum if and only if the request did. - ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum); // Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust. isMasterCmd = @@ -436,7 +566,6 @@ void exhaustIsMasterTest(bool enableChecksum) { ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand), Milliseconds(50)); ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); - ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum); res = OpMsg::parse(reply).body; ASSERT_OK(getStatusFromCommandResult(res)); auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); @@ -450,7 +579,6 @@ void exhaustIsMasterTest(bool enableChecksum) { ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse), Milliseconds(50)); ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); - ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum); res = OpMsg::parse(reply).body; ASSERT_OK(getStatusFromCommandResult(res)); nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); @@ -459,16 +587,6 @@ void exhaustIsMasterTest(bool enableChecksum) { // The exhaust stream would continue indefinitely. } -TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) { - exhaustIsMasterTest(false); -} - -// TODO SERVER-44517: The checksum logic will be unified for exhaust commands, so we don't need to -// test checksum for both exhaust isMaster and exhaust getMore. -TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectlyWithChecksum) { - exhaustIsMasterTest(true); -} - TEST(OpMsg, ServerHandlesExhaustIsMasterWithTopologyChange) { std::string errMsg; auto fixtureConn = std::unique_ptr<DBClientBase>( @@ -576,7 +694,7 @@ TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) { unittest::getFixtureConnectionString().connect("integration_test", errMsg)); uassert(ErrorCodes::SocketException, errMsg, conn); - // Only test exhaust against a single server. + // Only test exhaust against a standalone. if (conn->isReplicaSetMember() || conn->isMongos()) { return; } @@ -588,7 +706,7 @@ TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) { unittest::log() << "Inserting " << nDocs << " documents."; for (int i = 0; i < nDocs; i++) { auto doc = BSON("_id" << i); - conn->insert(nss.toString(), doc, 0); + conn->insert(nss.toString(), doc); } ASSERT_EQ(conn->count(nss), size_t(nDocs)); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index c80ed6e0956..61f692f315a 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -873,14 +873,11 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) { } DbResponse dbResponse; - dbResponse.shouldRunAgainForExhaust = reply->shouldRunAgainForExhaust(); - dbResponse.nextInvocation = reply->getNextInvocation(); if (OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)) { auto responseObj = reply->getBodyBuilder().asTempObj(); - auto cursorObj = responseObj.getObjectField("cursor"); - if (responseObj.getField("ok").trueValue() && !cursorObj.isEmpty()) { - dbResponse.exhaustNS = cursorObj.getField("ns").String(); - dbResponse.exhaustCursorId = cursorObj.getField("id").numberLong(); + if (responseObj.getField("ok").trueValue()) { + dbResponse.shouldRunAgainForExhaust = reply->shouldRunAgainForExhaust(); + dbResponse.nextInvocation = reply->getNextInvocation(); } } dbResponse.response = reply->done(); diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 68ff7e8c1d9..4ca997d96ed 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -67,10 +67,18 @@ Message makeLegacyExhaustMessage(Message* m, const DbResponse& dbresponse) { // OP_QUERY responses are always of type OP_REPLY. invariant(dbresponse.response.operation() == opReply); - if (dbresponse.exhaustNS.empty()) { + if (!dbresponse.shouldRunAgainForExhaust) { return Message(); } + // Legacy find operations via the OP_QUERY/OP_GET_MORE network protocol never provide the next + // invocation for exhaust. + invariant(!dbresponse.nextInvocation); + + DbMessage dbmsg(*m); + invariant(dbmsg.messageShouldHaveNs()); + const char* ns = dbmsg.getns(); + MsgData::View header = dbresponse.response.header(); QueryResult::View qr = header.view2ptr(); long long cursorid = qr.getCursorId(); @@ -86,7 +94,7 @@ Message makeLegacyExhaustMessage(Message* m, const DbResponse& dbresponse) { b.appendNum(header.getResponseToMsgId()); // in response to b.appendNum(static_cast<int>(dbGetMore)); // opCode is OP_GET_MORE b.appendNum(static_cast<int>(0)); // Must be ZERO (reserved) - b.appendStr(dbresponse.exhaustNS); // Namespace + b.appendStr(StringData(ns)); // Namespace b.appendNum(static_cast<int>(0)); // ntoreturn b.appendNum(cursorid); // cursor id from the OP_REPLY @@ -110,54 +118,31 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { return Message(); } - const bool checksumPresent = OpMsg::isFlagSet(requestMsg, OpMsg::kChecksumPresent); - if (dbresponse->shouldRunAgainForExhaust) { - Message exhaustMessage; - - if (auto nextInvocation = dbresponse->nextInvocation) { - // The command provided a new BSONObj for the next invocation. - OpMsgBuilder builder; - builder.setBody(*nextInvocation); - exhaustMessage = builder.finish(); - } else { - // Reuse the previous invocation for the next invocation. - OpMsg::removeChecksum(&requestMsg); - exhaustMessage = requestMsg; - } - - // The id of the response is used as the request id of this 'synthetic' request. Re-checksum - // if needed. - exhaustMessage.header().setId(dbresponse->response.header().getId()); - exhaustMessage.header().setResponseToMsgId( - dbresponse->response.header().getResponseToMsgId()); - OpMsg::setFlag(&exhaustMessage, OpMsg::kExhaustSupported); - if (checksumPresent) { - OpMsg::appendChecksum(&exhaustMessage); - } - - OpMsg::removeChecksum(&dbresponse->response); - // Indicate that the response is part of an exhaust stream. Re-checksum if needed. - OpMsg::setFlag(&dbresponse->response, OpMsg::kMoreToCome); - if (checksumPresent) { - OpMsg::appendChecksum(&dbresponse->response); - } - - return exhaustMessage; + if (!dbresponse->shouldRunAgainForExhaust) { + return Message(); } - // TODO SERVER-44517: Everything below this line should go away, and we should return Message(), - // since the command did not set a next invocation. + const bool checksumPresent = OpMsg::isFlagSet(requestMsg, OpMsg::kChecksumPresent); + Message exhaustMessage; - // Only support exhaust for 'getMore' commands. - auto request = OpMsgRequest::parse(requestMsg); - if (request.getCommandName() != "getMore"_sd) { - return Message(); + if (auto nextInvocation = dbresponse->nextInvocation) { + // The command provided a new BSONObj for the next invocation. + OpMsgBuilder builder; + builder.setBody(*nextInvocation); + exhaustMessage = builder.finish(); + } else { + // Reuse the previous invocation for the next invocation. + OpMsg::removeChecksum(&requestMsg); + exhaustMessage = requestMsg; } - // A returned cursor id of '0' indicates that the cursor is exhausted and so the exhaust stream - // should be terminated. Also make sure the cursor namespace is valid. - if (dbresponse->exhaustCursorId == 0 || dbresponse->exhaustNS.empty()) { - return Message(); + // The id of the response is used as the request id of this 'synthetic' request. Re-checksum + // if needed. + exhaustMessage.header().setId(dbresponse->response.header().getId()); + exhaustMessage.header().setResponseToMsgId(dbresponse->response.header().getResponseToMsgId()); + OpMsg::setFlag(&exhaustMessage, OpMsg::kExhaustSupported); + if (checksumPresent) { + OpMsg::appendChecksum(&exhaustMessage); } OpMsg::removeChecksum(&dbresponse->response); @@ -167,16 +152,7 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { OpMsg::appendChecksum(&dbresponse->response); } - // Return an augmented form of the initial request, which is to be used as the next request to - // be processed by the database. The id of the response is used as the request id of this - // 'synthetic' request. Re-checksum if needed. - OpMsg::removeChecksum(&requestMsg); - requestMsg.header().setId(dbresponse->response.header().getId()); - requestMsg.header().setResponseToMsgId(dbresponse->response.header().getResponseToMsgId()); - if (checksumPresent) { - OpMsg::appendChecksum(&requestMsg); - } - return requestMsg; + return exhaustMessage; } } // namespace @@ -501,12 +477,11 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) { #endif } - // If the incoming message has the exhaust flag set and is a 'getMore' command, then we - // bypass the normal RPC behavior. We will sink the response to the network, but we also - // synthesize a new 'getMore' request, as if we sourced a new message from the network. This - // new request is sent to the database once again to be processed. This cycle repeats as - // long as the associated cursor is not exhausted. Once it is exhausted, we will send a - // final response, terminating the exhaust stream. + // If the incoming message has the exhaust flag set, then we bypass the normal RPC behavior. + // We will sink the response to the network, but we also synthesize a new request, as if we + // sourced a new message from the network. This new request is sent to the database once + // again to be processed. This cycle repeats as long as the command indicates the exhaust + // stream should continue. _inMessage = makeExhaustMessage(_inMessage, &dbresponse); _inExhaust = !_inMessage.empty(); diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index 2cc54156c6b..caf064f91c6 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -93,10 +93,8 @@ public: if (OpMsg::isFlagSet(request, OpMsg::kExhaustSupported)) { auto reply = OpMsg::parse(res); auto cursorObj = reply.body.getObjectField("cursor"); - if (reply.body["ok"].trueValue() && !cursorObj.isEmpty()) { - dbResponse.exhaustCursorId = cursorObj.getField("id").numberLong(); - dbResponse.exhaustNS = cursorObj.getField("ns").String(); - } + dbResponse.shouldRunAgainForExhaust = reply.body["ok"].trueValue() && + !cursorObj.isEmpty() && (cursorObj.getField("id").numberLong() != 0); } dbResponse.response = res; @@ -490,130 +488,6 @@ TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaust) { ASSERT_EQ(firstResponseId, msg.header().getResponseToMsgId()); } -TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaustAndEmptyResponseNamespace) { - // Construct a 'getMore' OP_MSG request with the exhaust flag set. - const int32_t initRequestId = 1; - const long long cursorId = 42; - const std::string nss = "test.coll"; - Message getMoreWithExhaust = getMoreRequestWithExhaust(nss, cursorId, initRequestId); - - // Construct a 'getMore' response with an empty namespace. - BSONObj getMoreTerminalResBody = BSON("ok" << 1 << "cursor" - << BSON("id" << 42 << "ns" - << "" - << "nextBatch" << BSONArray())); - Message getMoreTerminalRes = buildOpMsg(getMoreTerminalResBody); - - // Let the 'getMore' request be sourced from the network, processed in the database, and - // and the response sunk to the TransportLayer. - runSourceAndSinkTest( - _tl, _sep, getMoreWithExhaust, getMoreTerminalRes, State::Process, State::Source); - - // Check the last sunk message. - auto msg = _tl->getLastSunk(); - ASSERT(!msg.empty()); - auto reply = OpMsg::parse(msg); - ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome)); - ASSERT_BSONOBJ_EQ(getMoreTerminalResBody, reply.body); -} - -TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaustAndEmptyCursorObjectInResponse) { - // Construct a 'getMore' OP_MSG request with the exhaust flag set. - const int32_t initRequestId = 1; - const long long cursorId = 42; - const std::string nss = "test.coll"; - Message getMoreWithExhaust = getMoreRequestWithExhaust(nss, cursorId, initRequestId); - - // Construct a 'getMore' response with an empty cursor object. - BSONObj getMoreTerminalResBody = BSON("ok" << 1 << "cursor" << BSONObj()); - Message getMoreTerminalRes = buildOpMsg(getMoreTerminalResBody); - - // Let the 'getMore' request be sourced from the network, processed in the database, and - // and the response sunk to the TransportLayer. - runSourceAndSinkTest( - _tl, _sep, getMoreWithExhaust, getMoreTerminalRes, State::Process, State::Source); - - // Check the last sunk message. - auto msg = _tl->getLastSunk(); - ASSERT(!msg.empty()); - auto reply = OpMsg::parse(msg); - ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome)); - ASSERT_BSONOBJ_EQ(getMoreTerminalResBody, reply.body); -} - -TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaustAndNoCursorFieldInResponse) { - // Construct a 'getMore' OP_MSG request with the exhaust flag set. - const int32_t initRequestId = 1; - const long long cursorId = 42; - const std::string nss = "test.coll"; - Message getMoreWithExhaust = getMoreRequestWithExhaust(nss, cursorId, initRequestId); - - // Construct a 'getMore' response with no 'cursor' field. - BSONObj getMoreTerminalResBody = BSON("ok" << 1); - Message getMoreTerminalRes = buildOpMsg(getMoreTerminalResBody); - - // Let the 'getMore' request be sourced from the network, processed in the database, and - // and the response sunk to the TransportLayer. - runSourceAndSinkTest( - _tl, _sep, getMoreWithExhaust, getMoreTerminalRes, State::Process, State::Source); - - // Check the last sunk message. - auto msg = _tl->getLastSunk(); - ASSERT(!msg.empty()); - auto reply = OpMsg::parse(msg); - ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome)); - ASSERT_BSONOBJ_EQ(getMoreTerminalResBody, reply.body); -} - -TEST_F(ServiceStateMachineFixture, TestGetMoreWithExhaustAndNonOKResponse) { - // Construct a 'getMore' OP_MSG request with the exhaust flag set. - const int32_t initRequestId = 1; - const long long cursorId = 42; - const std::string nss = "test.coll"; - Message getMoreWithExhaust = getMoreRequestWithExhaust(nss, cursorId, initRequestId); - - // Construct a 'getMore' response with a non-ok response. - BSONObj getMoreTerminalResBody = BSON( - "ok" << 0 << "cursor" << BSON("id" << 42 << "ns" << nss << "nextBatch" << BSONArray())); - Message getMoreTerminalRes = buildOpMsg(getMoreTerminalResBody); - - // Let the 'getMore' request be sourced from the network, processed in the database, and - // and the response sunk to the TransportLayer. - runSourceAndSinkTest( - _tl, _sep, getMoreWithExhaust, getMoreTerminalRes, State::Process, State::Source); - - // Check the last sunk message. - auto msg = _tl->getLastSunk(); - ASSERT(!msg.empty()); - auto reply = OpMsg::parse(msg); - ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome)); - ASSERT_BSONOBJ_EQ(getMoreTerminalResBody, reply.body); -} - - -TEST_F(ServiceStateMachineFixture, TestExhaustOnlySupportedForGetMoreCommand) { - // Construct a 'find' OP_MSG request with the exhaust flag set. We should ignore exhaust flags - // for non 'getMore' commands. - const std::string nss = "test.coll"; - Message findWithExhaust = buildOpMsg(BSON("find" << nss)); - OpMsg::setFlag(&findWithExhaust, OpMsg::kExhaustSupported); - - // Construct an OK response. - Message findRes = buildOpMsg(BSON( - "ok" << 1 << "cursor" << BSON("id" << 42 << "ns" << nss << "firstBatch" << BSONArray()))); - - // Let the 'find' request be sourced from the network, processed in the database, and - // and the response sunk to the TransportLayer. - runSourceAndSinkTest(_tl, _sep, findWithExhaust, findRes, State::Process, State::Source); - - // Check the last sunk message. - auto msg = _tl->getLastSunk(); - ASSERT(!msg.empty()); - auto reply = OpMsg::parse(msg); - ASSERT_FALSE(OpMsg::isFlagSet(msg, OpMsg::kMoreToCome)); - ASSERT_EQ(1, reply.body.getIntField("ok")); -} - TEST_F(ServiceStateMachineFixture, TestThrowHandling) { _sep->setUassertInHandler(); |