diff options
author | Tess Avitabile <tess.avitabile@mongodb.com> | 2019-12-16 21:15:41 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-16 21:15:41 +0000 |
commit | ec498c5968974012d50d6d04c26cd2cd0db87d22 (patch) | |
tree | 39a80b70f400f27bd4af3d83748112bfed7cc3b4 | |
parent | d9f793354be29ef51e7d32e8cb46e7bf84b99d66 (diff) | |
download | mongo-ec498c5968974012d50d6d04c26cd2cd0db87d22.tar.gz |
SERVER-44510 Implement exhaust isMaster
-rw-r--r-- | src/mongo/db/commands.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/commands.h | 42 | ||||
-rw-r--r-- | src/mongo/db/dbmessage.h | 13 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_info.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 7 | ||||
-rw-r--r-- | src/mongo/rpc/op_msg_integration_test.cpp | 196 | ||||
-rw-r--r-- | src/mongo/rpc/reply_builder_interface.cpp | 14 | ||||
-rw-r--r-- | src/mongo/rpc/reply_builder_interface.h | 25 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 2 | ||||
-rw-r--r-- | src/mongo/tools/bridge.cpp | 54 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 48 |
12 files changed, 428 insertions, 94 deletions
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index 469c71c49a2..e67e02b8c85 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -614,9 +614,11 @@ void CommandInvocation::checkAuthorization(OperationContext* opCtx, ////////////////////////////////////////////////////////////// // Command -class BasicCommand::Invocation final : public CommandInvocation { +class BasicCommandWithReplyBuilderInterface::Invocation final : public CommandInvocation { public: - Invocation(OperationContext*, const OpMsgRequest& request, BasicCommand* command) + Invocation(OperationContext*, + const OpMsgRequest& request, + BasicCommandWithReplyBuilderInterface* command) : CommandInvocation(command), _command(command), _request(&request), @@ -625,10 +627,11 @@ public: private: void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { opCtx->lockState()->setDebugInfo(redact(_request->body)); - BSONObjBuilder bob = result->getBodyBuilder(); - bool ok = _command->run(opCtx, _dbName, _request->body, bob); - if (!ok) + bool ok = _command->runWithReplyBuilder(opCtx, _dbName, _request->body, result); + if (!ok) { + BSONObjBuilder bob = result->getBodyBuilder(); CommandHelpers::appendSimpleCommandStatus(bob, ok); + } } void explain(OperationContext* opCtx, @@ -666,7 +669,7 @@ private: return _request->body; } - BasicCommand* const _command; + BasicCommandWithReplyBuilderInterface* const _command; const OpMsgRequest* const _request; const std::string _dbName; }; @@ -688,8 +691,8 @@ void Command::snipForLogging(mutablebson::Document* cmdObj) const { } -std::unique_ptr<CommandInvocation> BasicCommand::parse(OperationContext* opCtx, - const OpMsgRequest& request) { +std::unique_ptr<CommandInvocation> BasicCommandWithReplyBuilderInterface::parse( + OperationContext* opCtx, const OpMsgRequest& request) { CommandHelpers::uassertNoDocumentSequences(getName(), request); return std::make_unique<Invocation>(opCtx, request, this); } @@ -701,22 +704,22 @@ Command::Command(StringData name, StringData oldName) globalCommandRegistry()->registerCommand(this, name, oldName); } -Status BasicCommand::explain(OperationContext* opCtx, - const OpMsgRequest& request, - ExplainOptions::Verbosity verbosity, - rpc::ReplyBuilderInterface* result) const { +Status BasicCommandWithReplyBuilderInterface::explain(OperationContext* opCtx, + const OpMsgRequest& request, + ExplainOptions::Verbosity verbosity, + rpc::ReplyBuilderInterface* result) const { return {ErrorCodes::IllegalOperation, str::stream() << "Cannot explain cmd: " << getName()}; } -Status BasicCommand::checkAuthForOperation(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj) const { +Status BasicCommandWithReplyBuilderInterface::checkAuthForOperation(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj) const { return checkAuthForCommand(opCtx->getClient(), dbname, cmdObj); } -Status BasicCommand::checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) const { +Status BasicCommandWithReplyBuilderInterface::checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) const { std::vector<Privilege> privileges; this->addRequiredPrivileges(dbname, cmdObj, &privileges); if (AuthorizationSession::get(client)->isAuthorizedForPrivileges(privileges)) diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 69ececd390c..efc4230556f 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -554,9 +554,10 @@ private: /** * A subclass of Command that only cares about the BSONObj body and doesn't need access to document - * sequences. + * sequences. Commands should implement this class if they require access to the + * ReplyBuilderInterface (e.g. to set the next invocation for an exhaust command). */ -class BasicCommand : public Command { +class BasicCommandWithReplyBuilderInterface : public Command { private: class Invocation; @@ -576,15 +577,12 @@ public: // /** - * run the given command - * implement this... - * - * return value is true if succeeded. if false, set errmsg text. + * Runs the given command. Returns true upon success. */ - virtual bool run(OperationContext* opCtx, - const std::string& db, - const BSONObj& cmdObj, - BSONObjBuilder& result) = 0; + virtual bool runWithReplyBuilder(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + rpc::ReplyBuilderInterface* replyBuilder) = 0; /** * Commands which can be explained override this method. Any operation which has a query @@ -690,6 +688,30 @@ private: }; /** + * Commands should implement this class if they do not require access to the ReplyBuilderInterface. + */ +class BasicCommand : public BasicCommandWithReplyBuilderInterface { +public: + using BasicCommandWithReplyBuilderInterface::BasicCommandWithReplyBuilderInterface; + + /** + * Runs the given command. Returns true upon success. + */ + virtual bool run(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + BSONObjBuilder& result) = 0; + + bool runWithReplyBuilder(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + rpc::ReplyBuilderInterface* replyBuilder) final { + auto result = replyBuilder->getBodyBuilder(); + return run(opCtx, db, cmdObj, result); + } +}; + +/** * Deprecated. Do not add new subclasses. */ class ErrmsgCommandDeprecated : public BasicCommand { diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 9002dd97607..f66239637d9 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -443,7 +443,18 @@ Message makeGetMoreMessage(StringData ns, long long cursorId, int nToReturn, int * Order of fields makes DbResponse{funcReturningMessage()} valid. */ struct DbResponse { - Message response; // If empty, nothing will be returned to the client. + // If empty, nothing will be returned to the client. + Message response; + + // For exhaust commands, indicates whether the command should be run again. + bool shouldRunAgainForExhaust = false; + + // The next invocation for an exhaust command. If this is boost::none, the previous invocation + // should be reused for the next invocation. + boost::optional<BSONObj> nextInvocation; + + // TODO SERVER-44517: Remove 'exhaustNS' and 'exhaustCursorId'. Instead, GetMoreCmd::run() + // should set 'shouldRunAgainForExhaust'. std::string exhaustNS; // Namespace of cursor if exhaust mode, else "". // Cursor ID when running on exhaust mode. Defaults to '0', indicating // that the cursor is exhausted. diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index f0ff525e304..8fc158a7b81 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -395,6 +395,20 @@ public: return _comment ? boost::optional<BSONElement>(_comment->firstElement()) : boost::none; } + /** + * Sets whether this operation is an exhaust command. + */ + void setExhaust(bool exhaust) { + _exhaust = exhaust; + } + + /** + * Returns whether this operation is an exhaust command. + */ + bool isExhaust() const { + return _exhaust; + } + private: StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override; @@ -528,6 +542,9 @@ private: // If populated, this is an owned singleton BSONObj whose only field, 'comment', is a copy of // the 'comment' field from the input command object. boost::optional<BSONObj> _comment; + + // Whether this operation is an exhaust command. + bool _exhaust = false; }; namespace repl { diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index f2007792326..fb86c2a74a4 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -75,7 +75,16 @@ using std::unique_ptr; namespace repl { namespace { -void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int level) { +/** + * Appends replication-related fields to the isMaster response. Returns the topology version that + * was included in the response. + * TODO SERVER-44813: Always return a topology version, including on standalones. + */ +boost::optional<TopologyVersion> appendReplicationInfo(OperationContext* opCtx, + BSONObjBuilder& result, + int level) { + TopologyVersion topologyVersion; + ReplicationCoordinator* replCoord = ReplicationCoordinator::get(opCtx); if (replCoord->getSettings().usingReplSets()) { const auto& horizonParams = SplitHorizon::getParameters(opCtx->getClient()); @@ -85,7 +94,7 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int if (level) { replCoord->appendSlaveInfoData(&result); } - return; + return isMasterResponse.getTopologyVersion(); } result.appendBool("ismaster", @@ -154,6 +163,8 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int replCoord->appendSlaveInfoData(&result); } + + return boost::none; } class ReplicationInfoServerStatus : public ServerStatusSection { @@ -213,9 +224,9 @@ public: } } oplogInfoServerStatus; -class CmdIsMaster final : public BasicCommand { +class CmdIsMaster final : public BasicCommandWithReplyBuilderInterface { public: - CmdIsMaster() : BasicCommand("isMaster", "ismaster") {} + CmdIsMaster() : BasicCommandWithReplyBuilderInterface("isMaster", "ismaster") {} bool requiresAuth() const final { return false; @@ -238,10 +249,10 @@ public: const BSONObj& cmdObj, std::vector<Privilege>* out) const final {} // No auth required - bool run(OperationContext* opCtx, - const string&, - const BSONObj& cmdObj, - BSONObjBuilder& result) final { + bool runWithReplyBuilder(OperationContext* opCtx, + const string&, + const BSONObj& cmdObj, + rpc::ReplyBuilderInterface* replyBuilder) final { CommandHelpers::handleMarkKillOnClientDisconnect(opCtx); // TODO Unwind after SERVER-41070 @@ -365,12 +376,13 @@ public: // present if and only if topologyVersion is present in the request. auto topologyVersionElement = cmdObj["topologyVersion"]; auto maxAwaitTimeMSField = cmdObj["maxAwaitTimeMS"]; + boost::optional<TopologyVersion> clientTopologyVersion; if (topologyVersionElement && maxAwaitTimeMSField) { - auto topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"), - topologyVersionElement.Obj()); + clientTopologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"), + topologyVersionElement.Obj()); uassert(31372, "topologyVersion must have a non-negative counter", - topologyVersion.getCounter() >= 0); + clientTopologyVersion->getCounter() >= 0); long long maxAwaitTimeMS; uassertStatusOK(bsonExtractIntegerField(cmdObj, "maxAwaitTimeMS", &maxAwaitTimeMS)); @@ -390,7 +402,8 @@ public: !topologyVersionElement && !maxAwaitTimeMSField); } - appendReplicationInfo(opCtx, result, 0); + auto result = replyBuilder->getBodyBuilder(); + auto currentTopologyVersion = appendReplicationInfo(opCtx, result, 0); if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { const int configServerModeNumber = 2; @@ -432,6 +445,34 @@ public: auto& saslMechanismRegistry = SASLServerMechanismRegistry::get(opCtx->getServiceContext()); saslMechanismRegistry.advertiseMechanismNamesForUser(opCtx, cmdObj, &result); + // TODO SERVER-44813: currentTopologyVersion should always be set. + if (opCtx->isExhaust() && currentTopologyVersion) { + LOG(3) << "Using exhaust for isMaster protocol"; + + uassert(51756, + "An isMaster request with exhaust must specify 'maxAwaitTimeMS'", + maxAwaitTimeMSField); + invariant(clientTopologyVersion); + + if (clientTopologyVersion->getProcessId() == currentTopologyVersion->getProcessId() && + clientTopologyVersion->getCounter() == currentTopologyVersion->getCounter()) { + // Indicate that the previous invocation should be reused for the next invocation. + replyBuilder->setNextInvocation(boost::none); + } else { + BSONObjBuilder nextInvocationBuilder; + for (auto&& elt : cmdObj) { + if (elt.fieldNameStringData() == "topologyVersion"_sd) { + BSONObjBuilder topologyVersionBuilder( + nextInvocationBuilder.subobjStart("topologyVersion")); + currentTopologyVersion->serialize(&topologyVersionBuilder); + } else { + nextInvocationBuilder.append(elt); + } + } + replyBuilder->setNextInvocation(nextInvocationBuilder.obj()); + } + } + return true; } } cmdismaster; diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index efae06481bf..ea792fa18ae 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -1146,6 +1146,8 @@ DbResponse receivedCommands(OperationContext* opCtx, CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); } + opCtx->setExhaust(OpMsg::isFlagSet(message, OpMsg::kExhaustSupported)); + execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors); } catch (const DBException& ex) { BSONObjBuilder metadataBob; @@ -1178,6 +1180,11 @@ DbResponse receivedCommands(OperationContext* opCtx, DbResponse dbResponse; + dbResponse.shouldRunAgainForExhaust = replyBuilder->shouldRunAgainForExhaust(); + dbResponse.nextInvocation = replyBuilder->getNextInvocation(); + + // TODO SERVER-44517: This block can be removed once 'exhaustNS' and 'exhaustCursorId' are + // removed from DbResponse. if (OpMsg::isFlagSet(message, OpMsg::kExhaustSupported)) { auto responseObj = replyBuilder->getBodyBuilder().asTempObj(); auto cursorObj = responseObj.getObjectField("cursor"); diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index 8659b7014d9..40a6ee044a8 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/client/dbclient_connection.h" +#include "mongo/client/dbclient_rs.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/query/getmore_request.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -288,7 +289,7 @@ void enableClientChecksum() { failPoint->setMode(FailPoint::off); } -void exhaustTest(bool enableChecksum) { +void exhaustGetMoreTest(bool enableChecksum) { std::string errMsg; auto conn = std::unique_ptr<DBClientBase>( unittest::getFixtureConnectionString().connect("integration_test", errMsg)); @@ -374,12 +375,197 @@ void exhaustTest(bool enableChecksum) { ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 4)); } -TEST(OpMsg, ServerHandlesExhaustCorrectly) { - exhaustTest(false); +TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectly) { + exhaustGetMoreTest(false); } -TEST(OpMsg, ServerHandlesExhaustCorrectlyWithChecksum) { - exhaustTest(true); +TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectlyWithChecksum) { + exhaustGetMoreTest(true); +} + +void exhaustIsMasterTest(bool enableChecksum) { + std::string errMsg; + auto fixtureConn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + + // TODO SERVER-44813: Run this test on standalone. + // TODO SERVER-44521: Run this test on mongos. + if (!fixtureConn->isReplicaSetMember()) { + return; + } + + // Connect directly to the primary. + DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + + if (!enableChecksum) { + disableClientChecksum(); + } + + ON_BLOCK_EXIT([&] { enableClientChecksum(); }); + + auto tickSource = getGlobalServiceContext()->getTickSource(); + + // Issue an isMaster command without a topology version. + auto isMasterCmd = BSON("isMaster" << 1); + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto topologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + // Reply has checksum if and only if the request did. + ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum); + + // Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust. + isMasterCmd = + BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100); + opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run isMaster command to initiate the exhaust stream. + auto beforeExhaustCommand = tickSource->getTicks(); + ASSERT(conn->call(request, reply)); + auto afterFirstResponse = tickSource->getTicks(); + // Allow for clock skew when testing the response time. + ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand), + Milliseconds(50)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion); + + // Receive next exhaust message. + auto lastRequestId = reply.header().getId(); + ASSERT_OK(conn->recv(reply, lastRequestId)); + auto afterSecondResponse = tickSource->getTicks(); + // Allow for clock skew when testing the response time. + ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse), + Milliseconds(50)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion); + + // The exhaust stream would continue indefinitely. +} + +TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) { + exhaustIsMasterTest(false); +} + +// TODO SERVER-44517: The checksum logic will be unified for exhaust commands, so we don't need to +// test checksum for both exhaust isMaster and exhaust getMore. +TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectlyWithChecksum) { + exhaustIsMasterTest(true); +} + +TEST(OpMsg, ServerHandlesExhaustIsMasterWithTopologyChange) { + std::string errMsg; + auto fixtureConn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + + // TODO SERVER-44813: Run this test on standalone. + // TODO SERVER-44521: Run this test on mongos. + if (!fixtureConn->isReplicaSetMember()) { + return; + } + + // Connect directly to the primary. + DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + + auto tickSource = getGlobalServiceContext()->getTickSource(); + + // Issue an isMaster command without a topology version. + auto isMasterCmd = BSON("isMaster" << 1); + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto topologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + // Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust. Use a different + // processId for the topologyVersion so that the first response is returned immediately. + isMasterCmd = BSON("isMaster" << 1 << "topologyVersion" + << BSON("processId" << OID::gen() << "counter" << 0LL) + << "maxAwaitTimeMS" << 100); + opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run isMaster command to initiate the exhaust stream. The first response should be received + // immediately. + auto beforeExhaustCommand = tickSource->getTicks(); + ASSERT(conn->call(request, reply)); + auto afterFirstResponse = tickSource->getTicks(); + // TODO SERVER-44514: Change the following assertion to use ASSERT_LT once the server responds + // immediately on a topologyVersion with a different processId. + // Allow for clock skew when testing the response time. + ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand), + Milliseconds(50)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion); + + // Receive next exhaust message. The second response waits for 'maxAwaitTimeMS'. + auto lastRequestId = reply.header().getId(); + ASSERT_OK(conn->recv(reply, lastRequestId)); + auto afterSecondResponse = tickSource->getTicks(); + // Allow for clock skew when testing the response time. + ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse), + Milliseconds(50)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion); + + // The exhaust stream would continue indefinitely. +} + +TEST(OpMsg, ServerRejectsExhaustIsMasterWithoutMaxAwaitTimeMS) { + std::string errMsg; + auto fixtureConn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + + // TODO SERVER-44813: Run this test on standalone. + // TODO SERVER-44521: Run this test on mongos. + if (!fixtureConn->isReplicaSetMember()) { + return; + } + + // Connect directly to the primary. + DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + + // Issue an isMaster command with exhaust but no maxAwaitTimeMS. + auto isMasterCmd = BSON("isMaster" << 1); + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + auto request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT_NOT_OK(getStatusFromCommandResult(res)); } TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) { diff --git a/src/mongo/rpc/reply_builder_interface.cpp b/src/mongo/rpc/reply_builder_interface.cpp index c8b2ba8f08c..6f4eddbbebe 100644 --- a/src/mongo/rpc/reply_builder_interface.cpp +++ b/src/mongo/rpc/reply_builder_interface.cpp @@ -89,5 +89,19 @@ ReplyBuilderInterface& ReplyBuilderInterface::setCommandReply(Status nonOKStatus return setRawCommandReply(augmentReplyWithStatus(nonOKStatus, std::move(extraErrorInfo))); } +bool ReplyBuilderInterface::shouldRunAgainForExhaust() const { + return _shouldRunAgainForExhaust; +} + + +boost::optional<BSONObj> ReplyBuilderInterface::getNextInvocation() const { + return _nextInvocation; +} + +void ReplyBuilderInterface::setNextInvocation(boost::optional<BSONObj> nextInvocation) { + _shouldRunAgainForExhaust = true; + _nextInvocation = nextInvocation; +} + } // namespace rpc } // namespace mongo diff --git a/src/mongo/rpc/reply_builder_interface.h b/src/mongo/rpc/reply_builder_interface.h index d7eb747fa30..c0f93503c3c 100644 --- a/src/mongo/rpc/reply_builder_interface.h +++ b/src/mongo/rpc/reply_builder_interface.h @@ -136,8 +136,33 @@ public: */ virtual void reserveBytes(const std::size_t bytes) = 0; + /** + * For exhaust commands, returns whether the command should be run again. + */ + virtual bool shouldRunAgainForExhaust() const; + + /** + * Returns the next invocation for an exhaust command. If this is boost::none, the previous + * invocation should be reused for the next invocation. + */ + virtual boost::optional<BSONObj> getNextInvocation() const; + + /** + * For exhaust commands, indicates that the command should be run again and sets the next + * invocation. + */ + virtual void setNextInvocation(boost::optional<BSONObj> nextInvocation); + protected: ReplyBuilderInterface() = default; + +private: + // For exhaust commands, indicates whether the command should be run again. + bool _shouldRunAgainForExhaust = false; + + // The next invocation for an exhaust command. If this is boost::none, the previous invocation + // should be reused for the next invocation. + boost::optional<BSONObj> _nextInvocation; }; } // namespace rpc diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index db3a8810f22..c80ed6e0956 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -873,6 +873,8 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) { } DbResponse dbResponse; + dbResponse.shouldRunAgainForExhaust = reply->shouldRunAgainForExhaust(); + dbResponse.nextInvocation = reply->getNextInvocation(); if (OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)) { auto responseObj = reply->getBodyBuilder().asTempObj(); auto cursorObj = responseObj.getObjectField("cursor"); diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index a709769cf6f..7a51f983fee 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -171,14 +171,6 @@ public: return "<unknown>"; } - void setExhaust(bool val) { - _inExhaust = val; - } - - bool inExhaust() const { - return _inExhaust; - } - void extractHostInfo(OpMsgRequest request) { if (_seenFirstMessage) return; @@ -212,7 +204,6 @@ private: PseudoRandom _prng; boost::optional<HostAndPort> _host; bool _seenFirstMessage = false; - bool _inExhaust = false; }; const transport::Session::Decoration<ProxiedConnection> ProxiedConnection::_get = @@ -230,6 +221,18 @@ public: }; DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const Message& request) { + uassert(51754, + "Mongobridge does not support exhaust", + !OpMsg::isFlagSet(request, OpMsg::kExhaustSupported)); + + if (request.operation() == dbQuery) { + DbMessage d(request); + QueryMessage q(d); + if (q.queryOptions & QueryOption_Exhaust) { + uasserted(51755, "Mongobridge does not support exhaust"); + } + } + const auto& source = opCtx->getClient()->session(); auto& dest = ProxiedConnection::get(source); auto brCtx = BridgeContext::get(); @@ -264,25 +267,6 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const } } - if (dest.inExhaust()) { - DbMessage dbm(request); - - auto response = uassertStatusOK(dest->sourceMessage()); - if (response.operation() == dbCompressed) { - MessageCompressorManager compressorMgr; - response = uassertStatusOK(compressorMgr.decompressMessage(response)); - } - - MsgData::View header = response.header(); - QueryResult::View qr = header.view2ptr(); - if (qr.getCursorId()) { - return {std::move(response)}; - } else { - dest.setExhaust(false); - return {Message(), dbm.getns()}; - } - } - const bool isFireAndForgetCommand = OpMsg::isFlagSet(request, OpMsg::kMoreToCome); boost::optional<OpMsgRequest> cmdRequest; @@ -372,22 +356,10 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const return {Message()}; } - std::string exhaustNS; - if (request.operation() == dbQuery) { - DbMessage d(request); - QueryMessage q(d); - dest.setExhaust(q.queryOptions & QueryOption_Exhaust); - if (dest.inExhaust()) { - exhaustNS = d.getns(); - } - } else { - dest.setExhaust(false); - } - // The original checksum won't be valid once the network layer replaces requestId. Remove it // because the network layer re-checksums the response. OpMsg::removeChecksum(&response); - return {std::move(response), exhaustNS}; + return {std::move(response)}; } else { return {Message()}; } diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 5f8da8368fe..68ff7e8c1d9 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -97,12 +97,9 @@ Message makeLegacyExhaustMessage(Message* m, const DbResponse& dbresponse) { /** * Given a request and its already generated response, checks for exhaust flags. If exhaust is - * allowed, modifies the given request message to produce the subsequent exhaust message, and - * modifies the response message to indicate it is part of an exhaust stream. Returns the modified - * request message for it to be used as the subsequent, 'synthetic' exhaust request. Returns an - * empty message if exhaust is not allowed. - * - * Currently only supports exhaust for 'getMore' commands. + * allowed, produces the subsequent request message, and modifies the response message to indicate + * it is part of an exhaust stream. Returns the subsequent request message, which is known as a + * 'synthetic' exhaust request. Returns an empty message if exhaust is not allowed. */ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { if (requestMsg.operation() == dbQuery) { @@ -113,6 +110,44 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { return Message(); } + const bool checksumPresent = OpMsg::isFlagSet(requestMsg, OpMsg::kChecksumPresent); + if (dbresponse->shouldRunAgainForExhaust) { + Message exhaustMessage; + + if (auto nextInvocation = dbresponse->nextInvocation) { + // The command provided a new BSONObj for the next invocation. + OpMsgBuilder builder; + builder.setBody(*nextInvocation); + exhaustMessage = builder.finish(); + } else { + // Reuse the previous invocation for the next invocation. + OpMsg::removeChecksum(&requestMsg); + exhaustMessage = requestMsg; + } + + // The id of the response is used as the request id of this 'synthetic' request. Re-checksum + // if needed. + exhaustMessage.header().setId(dbresponse->response.header().getId()); + exhaustMessage.header().setResponseToMsgId( + dbresponse->response.header().getResponseToMsgId()); + OpMsg::setFlag(&exhaustMessage, OpMsg::kExhaustSupported); + if (checksumPresent) { + OpMsg::appendChecksum(&exhaustMessage); + } + + OpMsg::removeChecksum(&dbresponse->response); + // Indicate that the response is part of an exhaust stream. Re-checksum if needed. + OpMsg::setFlag(&dbresponse->response, OpMsg::kMoreToCome); + if (checksumPresent) { + OpMsg::appendChecksum(&dbresponse->response); + } + + return exhaustMessage; + } + + // TODO SERVER-44517: Everything below this line should go away, and we should return Message(), + // since the command did not set a next invocation. + // Only support exhaust for 'getMore' commands. auto request = OpMsgRequest::parse(requestMsg); if (request.getCommandName() != "getMore"_sd) { @@ -125,7 +160,6 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { return Message(); } - const bool checksumPresent = OpMsg::isFlagSet(requestMsg, OpMsg::kChecksumPresent); OpMsg::removeChecksum(&dbresponse->response); // Indicate that the response is part of an exhaust stream. Re-checksum if needed. OpMsg::setFlag(&dbresponse->response, OpMsg::kMoreToCome); |