summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2019-12-16 21:15:41 +0000
committerevergreen <evergreen@mongodb.com>2019-12-16 21:15:41 +0000
commitec498c5968974012d50d6d04c26cd2cd0db87d22 (patch)
tree39a80b70f400f27bd4af3d83748112bfed7cc3b4 /src
parentd9f793354be29ef51e7d32e8cb46e7bf84b99d66 (diff)
downloadmongo-ec498c5968974012d50d6d04c26cd2cd0db87d22.tar.gz
SERVER-44510 Implement exhaust isMaster
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands.cpp39
-rw-r--r--src/mongo/db/commands.h42
-rw-r--r--src/mongo/db/dbmessage.h13
-rw-r--r--src/mongo/db/operation_context.h17
-rw-r--r--src/mongo/db/repl/replication_info.cpp65
-rw-r--r--src/mongo/db/service_entry_point_common.cpp7
-rw-r--r--src/mongo/rpc/op_msg_integration_test.cpp196
-rw-r--r--src/mongo/rpc/reply_builder_interface.cpp14
-rw-r--r--src/mongo/rpc/reply_builder_interface.h25
-rw-r--r--src/mongo/s/commands/strategy.cpp2
-rw-r--r--src/mongo/tools/bridge.cpp54
-rw-r--r--src/mongo/transport/service_state_machine.cpp48
12 files changed, 428 insertions, 94 deletions
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index 469c71c49a2..e67e02b8c85 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -614,9 +614,11 @@ void CommandInvocation::checkAuthorization(OperationContext* opCtx,
//////////////////////////////////////////////////////////////
// Command
-class BasicCommand::Invocation final : public CommandInvocation {
+class BasicCommandWithReplyBuilderInterface::Invocation final : public CommandInvocation {
public:
- Invocation(OperationContext*, const OpMsgRequest& request, BasicCommand* command)
+ Invocation(OperationContext*,
+ const OpMsgRequest& request,
+ BasicCommandWithReplyBuilderInterface* command)
: CommandInvocation(command),
_command(command),
_request(&request),
@@ -625,10 +627,11 @@ public:
private:
void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override {
opCtx->lockState()->setDebugInfo(redact(_request->body));
- BSONObjBuilder bob = result->getBodyBuilder();
- bool ok = _command->run(opCtx, _dbName, _request->body, bob);
- if (!ok)
+ bool ok = _command->runWithReplyBuilder(opCtx, _dbName, _request->body, result);
+ if (!ok) {
+ BSONObjBuilder bob = result->getBodyBuilder();
CommandHelpers::appendSimpleCommandStatus(bob, ok);
+ }
}
void explain(OperationContext* opCtx,
@@ -666,7 +669,7 @@ private:
return _request->body;
}
- BasicCommand* const _command;
+ BasicCommandWithReplyBuilderInterface* const _command;
const OpMsgRequest* const _request;
const std::string _dbName;
};
@@ -688,8 +691,8 @@ void Command::snipForLogging(mutablebson::Document* cmdObj) const {
}
-std::unique_ptr<CommandInvocation> BasicCommand::parse(OperationContext* opCtx,
- const OpMsgRequest& request) {
+std::unique_ptr<CommandInvocation> BasicCommandWithReplyBuilderInterface::parse(
+ OperationContext* opCtx, const OpMsgRequest& request) {
CommandHelpers::uassertNoDocumentSequences(getName(), request);
return std::make_unique<Invocation>(opCtx, request, this);
}
@@ -701,22 +704,22 @@ Command::Command(StringData name, StringData oldName)
globalCommandRegistry()->registerCommand(this, name, oldName);
}
-Status BasicCommand::explain(OperationContext* opCtx,
- const OpMsgRequest& request,
- ExplainOptions::Verbosity verbosity,
- rpc::ReplyBuilderInterface* result) const {
+Status BasicCommandWithReplyBuilderInterface::explain(OperationContext* opCtx,
+ const OpMsgRequest& request,
+ ExplainOptions::Verbosity verbosity,
+ rpc::ReplyBuilderInterface* result) const {
return {ErrorCodes::IllegalOperation, str::stream() << "Cannot explain cmd: " << getName()};
}
-Status BasicCommand::checkAuthForOperation(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj) const {
+Status BasicCommandWithReplyBuilderInterface::checkAuthForOperation(OperationContext* opCtx,
+ const std::string& dbname,
+ const BSONObj& cmdObj) const {
return checkAuthForCommand(opCtx->getClient(), dbname, cmdObj);
}
-Status BasicCommand::checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) const {
+Status BasicCommandWithReplyBuilderInterface::checkAuthForCommand(Client* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) const {
std::vector<Privilege> privileges;
this->addRequiredPrivileges(dbname, cmdObj, &privileges);
if (AuthorizationSession::get(client)->isAuthorizedForPrivileges(privileges))
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index 69ececd390c..efc4230556f 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -554,9 +554,10 @@ private:
/**
* A subclass of Command that only cares about the BSONObj body and doesn't need access to document
- * sequences.
+ * sequences. Commands should implement this class if they require access to the
+ * ReplyBuilderInterface (e.g. to set the next invocation for an exhaust command).
*/
-class BasicCommand : public Command {
+class BasicCommandWithReplyBuilderInterface : public Command {
private:
class Invocation;
@@ -576,15 +577,12 @@ public:
//
/**
- * run the given command
- * implement this...
- *
- * return value is true if succeeded. if false, set errmsg text.
+ * Runs the given command. Returns true upon success.
*/
- virtual bool run(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) = 0;
+ virtual bool runWithReplyBuilder(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ rpc::ReplyBuilderInterface* replyBuilder) = 0;
/**
* Commands which can be explained override this method. Any operation which has a query
@@ -690,6 +688,30 @@ private:
};
/**
+ * Commands should implement this class if they do not require access to the ReplyBuilderInterface.
+ */
+class BasicCommand : public BasicCommandWithReplyBuilderInterface {
+public:
+ using BasicCommandWithReplyBuilderInterface::BasicCommandWithReplyBuilderInterface;
+
+ /**
+ * Runs the given command. Returns true upon success.
+ */
+ virtual bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) = 0;
+
+ bool runWithReplyBuilder(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ rpc::ReplyBuilderInterface* replyBuilder) final {
+ auto result = replyBuilder->getBodyBuilder();
+ return run(opCtx, db, cmdObj, result);
+ }
+};
+
+/**
* Deprecated. Do not add new subclasses.
*/
class ErrmsgCommandDeprecated : public BasicCommand {
diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h
index 9002dd97607..f66239637d9 100644
--- a/src/mongo/db/dbmessage.h
+++ b/src/mongo/db/dbmessage.h
@@ -443,7 +443,18 @@ Message makeGetMoreMessage(StringData ns, long long cursorId, int nToReturn, int
* Order of fields makes DbResponse{funcReturningMessage()} valid.
*/
struct DbResponse {
- Message response; // If empty, nothing will be returned to the client.
+ // If empty, nothing will be returned to the client.
+ Message response;
+
+ // For exhaust commands, indicates whether the command should be run again.
+ bool shouldRunAgainForExhaust = false;
+
+ // The next invocation for an exhaust command. If this is boost::none, the previous invocation
+ // should be reused for the next invocation.
+ boost::optional<BSONObj> nextInvocation;
+
+ // TODO SERVER-44517: Remove 'exhaustNS' and 'exhaustCursorId'. Instead, GetMoreCmd::run()
+ // should set 'shouldRunAgainForExhaust'.
std::string exhaustNS; // Namespace of cursor if exhaust mode, else "".
// Cursor ID when running on exhaust mode. Defaults to '0', indicating
// that the cursor is exhausted.
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index f0ff525e304..8fc158a7b81 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -395,6 +395,20 @@ public:
return _comment ? boost::optional<BSONElement>(_comment->firstElement()) : boost::none;
}
+ /**
+ * Sets whether this operation is an exhaust command.
+ */
+ void setExhaust(bool exhaust) {
+ _exhaust = exhaust;
+ }
+
+ /**
+ * Returns whether this operation is an exhaust command.
+ */
+ bool isExhaust() const {
+ return _exhaust;
+ }
+
private:
StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override;
@@ -528,6 +542,9 @@ private:
// If populated, this is an owned singleton BSONObj whose only field, 'comment', is a copy of
// the 'comment' field from the input command object.
boost::optional<BSONObj> _comment;
+
+ // Whether this operation is an exhaust command.
+ bool _exhaust = false;
};
namespace repl {
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index f2007792326..fb86c2a74a4 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -75,7 +75,16 @@ using std::unique_ptr;
namespace repl {
namespace {
-void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int level) {
+/**
+ * Appends replication-related fields to the isMaster response. Returns the topology version that
+ * was included in the response.
+ * TODO SERVER-44813: Always return a topology version, including on standalones.
+ */
+boost::optional<TopologyVersion> appendReplicationInfo(OperationContext* opCtx,
+ BSONObjBuilder& result,
+ int level) {
+ TopologyVersion topologyVersion;
+
ReplicationCoordinator* replCoord = ReplicationCoordinator::get(opCtx);
if (replCoord->getSettings().usingReplSets()) {
const auto& horizonParams = SplitHorizon::getParameters(opCtx->getClient());
@@ -85,7 +94,7 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int
if (level) {
replCoord->appendSlaveInfoData(&result);
}
- return;
+ return isMasterResponse.getTopologyVersion();
}
result.appendBool("ismaster",
@@ -154,6 +163,8 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int
replCoord->appendSlaveInfoData(&result);
}
+
+ return boost::none;
}
class ReplicationInfoServerStatus : public ServerStatusSection {
@@ -213,9 +224,9 @@ public:
}
} oplogInfoServerStatus;
-class CmdIsMaster final : public BasicCommand {
+class CmdIsMaster final : public BasicCommandWithReplyBuilderInterface {
public:
- CmdIsMaster() : BasicCommand("isMaster", "ismaster") {}
+ CmdIsMaster() : BasicCommandWithReplyBuilderInterface("isMaster", "ismaster") {}
bool requiresAuth() const final {
return false;
@@ -238,10 +249,10 @@ public:
const BSONObj& cmdObj,
std::vector<Privilege>* out) const final {} // No auth required
- bool run(OperationContext* opCtx,
- const string&,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) final {
+ bool runWithReplyBuilder(OperationContext* opCtx,
+ const string&,
+ const BSONObj& cmdObj,
+ rpc::ReplyBuilderInterface* replyBuilder) final {
CommandHelpers::handleMarkKillOnClientDisconnect(opCtx);
// TODO Unwind after SERVER-41070
@@ -365,12 +376,13 @@ public:
// present if and only if topologyVersion is present in the request.
auto topologyVersionElement = cmdObj["topologyVersion"];
auto maxAwaitTimeMSField = cmdObj["maxAwaitTimeMS"];
+ boost::optional<TopologyVersion> clientTopologyVersion;
if (topologyVersionElement && maxAwaitTimeMSField) {
- auto topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"),
- topologyVersionElement.Obj());
+ clientTopologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"),
+ topologyVersionElement.Obj());
uassert(31372,
"topologyVersion must have a non-negative counter",
- topologyVersion.getCounter() >= 0);
+ clientTopologyVersion->getCounter() >= 0);
long long maxAwaitTimeMS;
uassertStatusOK(bsonExtractIntegerField(cmdObj, "maxAwaitTimeMS", &maxAwaitTimeMS));
@@ -390,7 +402,8 @@ public:
!topologyVersionElement && !maxAwaitTimeMSField);
}
- appendReplicationInfo(opCtx, result, 0);
+ auto result = replyBuilder->getBodyBuilder();
+ auto currentTopologyVersion = appendReplicationInfo(opCtx, result, 0);
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
const int configServerModeNumber = 2;
@@ -432,6 +445,34 @@ public:
auto& saslMechanismRegistry = SASLServerMechanismRegistry::get(opCtx->getServiceContext());
saslMechanismRegistry.advertiseMechanismNamesForUser(opCtx, cmdObj, &result);
+ // TODO SERVER-44813: currentTopologyVersion should always be set.
+ if (opCtx->isExhaust() && currentTopologyVersion) {
+ LOG(3) << "Using exhaust for isMaster protocol";
+
+ uassert(51756,
+ "An isMaster request with exhaust must specify 'maxAwaitTimeMS'",
+ maxAwaitTimeMSField);
+ invariant(clientTopologyVersion);
+
+ if (clientTopologyVersion->getProcessId() == currentTopologyVersion->getProcessId() &&
+ clientTopologyVersion->getCounter() == currentTopologyVersion->getCounter()) {
+ // Indicate that the previous invocation should be reused for the next invocation.
+ replyBuilder->setNextInvocation(boost::none);
+ } else {
+ BSONObjBuilder nextInvocationBuilder;
+ for (auto&& elt : cmdObj) {
+ if (elt.fieldNameStringData() == "topologyVersion"_sd) {
+ BSONObjBuilder topologyVersionBuilder(
+ nextInvocationBuilder.subobjStart("topologyVersion"));
+ currentTopologyVersion->serialize(&topologyVersionBuilder);
+ } else {
+ nextInvocationBuilder.append(elt);
+ }
+ }
+ replyBuilder->setNextInvocation(nextInvocationBuilder.obj());
+ }
+ }
+
return true;
}
} cmdismaster;
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index efae06481bf..ea792fa18ae 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -1146,6 +1146,8 @@ DbResponse receivedCommands(OperationContext* opCtx,
CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp());
}
+ opCtx->setExhaust(OpMsg::isFlagSet(message, OpMsg::kExhaustSupported));
+
execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors);
} catch (const DBException& ex) {
BSONObjBuilder metadataBob;
@@ -1178,6 +1180,11 @@ DbResponse receivedCommands(OperationContext* opCtx,
DbResponse dbResponse;
+ dbResponse.shouldRunAgainForExhaust = replyBuilder->shouldRunAgainForExhaust();
+ dbResponse.nextInvocation = replyBuilder->getNextInvocation();
+
+ // TODO SERVER-44517: This block can be removed once 'exhaustNS' and 'exhaustCursorId' are
+ // removed from DbResponse.
if (OpMsg::isFlagSet(message, OpMsg::kExhaustSupported)) {
auto responseObj = replyBuilder->getBodyBuilder().asTempObj();
auto cursorObj = responseObj.getObjectField("cursor");
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp
index 8659b7014d9..40a6ee044a8 100644
--- a/src/mongo/rpc/op_msg_integration_test.cpp
+++ b/src/mongo/rpc/op_msg_integration_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/client/dbclient_connection.h"
+#include "mongo/client/dbclient_rs.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -288,7 +289,7 @@ void enableClientChecksum() {
failPoint->setMode(FailPoint::off);
}
-void exhaustTest(bool enableChecksum) {
+void exhaustGetMoreTest(bool enableChecksum) {
std::string errMsg;
auto conn = std::unique_ptr<DBClientBase>(
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
@@ -374,12 +375,197 @@ void exhaustTest(bool enableChecksum) {
ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 4));
}
-TEST(OpMsg, ServerHandlesExhaustCorrectly) {
- exhaustTest(false);
+TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectly) {
+ exhaustGetMoreTest(false);
}
-TEST(OpMsg, ServerHandlesExhaustCorrectlyWithChecksum) {
- exhaustTest(true);
+TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectlyWithChecksum) {
+ exhaustGetMoreTest(true);
+}
+
+void exhaustIsMasterTest(bool enableChecksum) {
+ std::string errMsg;
+ auto fixtureConn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+
+ // TODO SERVER-44813: Run this test on standalone.
+ // TODO SERVER-44521: Run this test on mongos.
+ if (!fixtureConn->isReplicaSetMember()) {
+ return;
+ }
+
+ // Connect directly to the primary.
+ DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+
+ if (!enableChecksum) {
+ disableClientChecksum();
+ }
+
+ ON_BLOCK_EXIT([&] { enableClientChecksum(); });
+
+ auto tickSource = getGlobalServiceContext()->getTickSource();
+
+ // Issue an isMaster command without a topology version.
+ auto isMasterCmd = BSON("isMaster" << 1);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto topologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ // Reply has checksum if and only if the request did.
+ ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum);
+
+ // Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust.
+ isMasterCmd =
+ BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100);
+ opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run isMaster command to initiate the exhaust stream.
+ auto beforeExhaustCommand = tickSource->getTicks();
+ ASSERT(conn->call(request, reply));
+ auto afterFirstResponse = tickSource->getTicks();
+ // Allow for clock skew when testing the response time.
+ ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand),
+ Milliseconds(50));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum);
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion);
+
+ // Receive next exhaust message.
+ auto lastRequestId = reply.header().getId();
+ ASSERT_OK(conn->recv(reply, lastRequestId));
+ auto afterSecondResponse = tickSource->getTicks();
+ // Allow for clock skew when testing the response time.
+ ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse),
+ Milliseconds(50));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum);
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion);
+
+ // The exhaust stream would continue indefinitely.
+}
+
+TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) {
+ exhaustIsMasterTest(false);
+}
+
+// TODO SERVER-44517: The checksum logic will be unified for exhaust commands, so we don't need to
+// test checksum for both exhaust isMaster and exhaust getMore.
+TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectlyWithChecksum) {
+ exhaustIsMasterTest(true);
+}
+
+TEST(OpMsg, ServerHandlesExhaustIsMasterWithTopologyChange) {
+ std::string errMsg;
+ auto fixtureConn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+
+ // TODO SERVER-44813: Run this test on standalone.
+ // TODO SERVER-44521: Run this test on mongos.
+ if (!fixtureConn->isReplicaSetMember()) {
+ return;
+ }
+
+ // Connect directly to the primary.
+ DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+
+ auto tickSource = getGlobalServiceContext()->getTickSource();
+
+ // Issue an isMaster command without a topology version.
+ auto isMasterCmd = BSON("isMaster" << 1);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto topologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+
+ // Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust. Use a different
+ // processId for the topologyVersion so that the first response is returned immediately.
+ isMasterCmd = BSON("isMaster" << 1 << "topologyVersion"
+ << BSON("processId" << OID::gen() << "counter" << 0LL)
+ << "maxAwaitTimeMS" << 100);
+ opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run isMaster command to initiate the exhaust stream. The first response should be received
+ // immediately.
+ auto beforeExhaustCommand = tickSource->getTicks();
+ ASSERT(conn->call(request, reply));
+ auto afterFirstResponse = tickSource->getTicks();
+ // TODO SERVER-44514: Change the following assertion to use ASSERT_LT once the server responds
+ // immediately on a topologyVersion with a different processId.
+ // Allow for clock skew when testing the response time.
+ ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand),
+ Milliseconds(50));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion);
+
+ // Receive next exhaust message. The second response waits for 'maxAwaitTimeMS'.
+ auto lastRequestId = reply.header().getId();
+ ASSERT_OK(conn->recv(reply, lastRequestId));
+ auto afterSecondResponse = tickSource->getTicks();
+ // Allow for clock skew when testing the response time.
+ ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse),
+ Milliseconds(50));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion);
+
+ // The exhaust stream would continue indefinitely.
+}
+
+TEST(OpMsg, ServerRejectsExhaustIsMasterWithoutMaxAwaitTimeMS) {
+ std::string errMsg;
+ auto fixtureConn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+
+ // TODO SERVER-44813: Run this test on standalone.
+ // TODO SERVER-44521: Run this test on mongos.
+ if (!fixtureConn->isReplicaSetMember()) {
+ return;
+ }
+
+ // Connect directly to the primary.
+ DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+
+ // Issue an isMaster command with exhaust but no maxAwaitTimeMS.
+ auto isMasterCmd = BSON("isMaster" << 1);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ auto request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT_NOT_OK(getStatusFromCommandResult(res));
}
TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) {
diff --git a/src/mongo/rpc/reply_builder_interface.cpp b/src/mongo/rpc/reply_builder_interface.cpp
index c8b2ba8f08c..6f4eddbbebe 100644
--- a/src/mongo/rpc/reply_builder_interface.cpp
+++ b/src/mongo/rpc/reply_builder_interface.cpp
@@ -89,5 +89,19 @@ ReplyBuilderInterface& ReplyBuilderInterface::setCommandReply(Status nonOKStatus
return setRawCommandReply(augmentReplyWithStatus(nonOKStatus, std::move(extraErrorInfo)));
}
+bool ReplyBuilderInterface::shouldRunAgainForExhaust() const {
+ return _shouldRunAgainForExhaust;
+}
+
+
+boost::optional<BSONObj> ReplyBuilderInterface::getNextInvocation() const {
+ return _nextInvocation;
+}
+
+void ReplyBuilderInterface::setNextInvocation(boost::optional<BSONObj> nextInvocation) {
+ _shouldRunAgainForExhaust = true;
+ _nextInvocation = nextInvocation;
+}
+
} // namespace rpc
} // namespace mongo
diff --git a/src/mongo/rpc/reply_builder_interface.h b/src/mongo/rpc/reply_builder_interface.h
index d7eb747fa30..c0f93503c3c 100644
--- a/src/mongo/rpc/reply_builder_interface.h
+++ b/src/mongo/rpc/reply_builder_interface.h
@@ -136,8 +136,33 @@ public:
*/
virtual void reserveBytes(const std::size_t bytes) = 0;
+ /**
+ * For exhaust commands, returns whether the command should be run again.
+ */
+ virtual bool shouldRunAgainForExhaust() const;
+
+ /**
+ * Returns the next invocation for an exhaust command. If this is boost::none, the previous
+ * invocation should be reused for the next invocation.
+ */
+ virtual boost::optional<BSONObj> getNextInvocation() const;
+
+ /**
+ * For exhaust commands, indicates that the command should be run again and sets the next
+ * invocation.
+ */
+ virtual void setNextInvocation(boost::optional<BSONObj> nextInvocation);
+
protected:
ReplyBuilderInterface() = default;
+
+private:
+ // For exhaust commands, indicates whether the command should be run again.
+ bool _shouldRunAgainForExhaust = false;
+
+ // The next invocation for an exhaust command. If this is boost::none, the previous invocation
+ // should be reused for the next invocation.
+ boost::optional<BSONObj> _nextInvocation;
};
} // namespace rpc
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index db3a8810f22..c80ed6e0956 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -873,6 +873,8 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
}
DbResponse dbResponse;
+ dbResponse.shouldRunAgainForExhaust = reply->shouldRunAgainForExhaust();
+ dbResponse.nextInvocation = reply->getNextInvocation();
if (OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)) {
auto responseObj = reply->getBodyBuilder().asTempObj();
auto cursorObj = responseObj.getObjectField("cursor");
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index a709769cf6f..7a51f983fee 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -171,14 +171,6 @@ public:
return "<unknown>";
}
- void setExhaust(bool val) {
- _inExhaust = val;
- }
-
- bool inExhaust() const {
- return _inExhaust;
- }
-
void extractHostInfo(OpMsgRequest request) {
if (_seenFirstMessage)
return;
@@ -212,7 +204,6 @@ private:
PseudoRandom _prng;
boost::optional<HostAndPort> _host;
bool _seenFirstMessage = false;
- bool _inExhaust = false;
};
const transport::Session::Decoration<ProxiedConnection> ProxiedConnection::_get =
@@ -230,6 +221,18 @@ public:
};
DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const Message& request) {
+ uassert(51754,
+ "Mongobridge does not support exhaust",
+ !OpMsg::isFlagSet(request, OpMsg::kExhaustSupported));
+
+ if (request.operation() == dbQuery) {
+ DbMessage d(request);
+ QueryMessage q(d);
+ if (q.queryOptions & QueryOption_Exhaust) {
+ uasserted(51755, "Mongobridge does not support exhaust");
+ }
+ }
+
const auto& source = opCtx->getClient()->session();
auto& dest = ProxiedConnection::get(source);
auto brCtx = BridgeContext::get();
@@ -264,25 +267,6 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const
}
}
- if (dest.inExhaust()) {
- DbMessage dbm(request);
-
- auto response = uassertStatusOK(dest->sourceMessage());
- if (response.operation() == dbCompressed) {
- MessageCompressorManager compressorMgr;
- response = uassertStatusOK(compressorMgr.decompressMessage(response));
- }
-
- MsgData::View header = response.header();
- QueryResult::View qr = header.view2ptr();
- if (qr.getCursorId()) {
- return {std::move(response)};
- } else {
- dest.setExhaust(false);
- return {Message(), dbm.getns()};
- }
- }
-
const bool isFireAndForgetCommand = OpMsg::isFlagSet(request, OpMsg::kMoreToCome);
boost::optional<OpMsgRequest> cmdRequest;
@@ -372,22 +356,10 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const
return {Message()};
}
- std::string exhaustNS;
- if (request.operation() == dbQuery) {
- DbMessage d(request);
- QueryMessage q(d);
- dest.setExhaust(q.queryOptions & QueryOption_Exhaust);
- if (dest.inExhaust()) {
- exhaustNS = d.getns();
- }
- } else {
- dest.setExhaust(false);
- }
-
// The original checksum won't be valid once the network layer replaces requestId. Remove it
// because the network layer re-checksums the response.
OpMsg::removeChecksum(&response);
- return {std::move(response), exhaustNS};
+ return {std::move(response)};
} else {
return {Message()};
}
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 5f8da8368fe..68ff7e8c1d9 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -97,12 +97,9 @@ Message makeLegacyExhaustMessage(Message* m, const DbResponse& dbresponse) {
/**
* Given a request and its already generated response, checks for exhaust flags. If exhaust is
- * allowed, modifies the given request message to produce the subsequent exhaust message, and
- * modifies the response message to indicate it is part of an exhaust stream. Returns the modified
- * request message for it to be used as the subsequent, 'synthetic' exhaust request. Returns an
- * empty message if exhaust is not allowed.
- *
- * Currently only supports exhaust for 'getMore' commands.
+ * allowed, produces the subsequent request message, and modifies the response message to indicate
+ * it is part of an exhaust stream. Returns the subsequent request message, which is known as a
+ * 'synthetic' exhaust request. Returns an empty message if exhaust is not allowed.
*/
Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
if (requestMsg.operation() == dbQuery) {
@@ -113,6 +110,44 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
return Message();
}
+ const bool checksumPresent = OpMsg::isFlagSet(requestMsg, OpMsg::kChecksumPresent);
+ if (dbresponse->shouldRunAgainForExhaust) {
+ Message exhaustMessage;
+
+ if (auto nextInvocation = dbresponse->nextInvocation) {
+ // The command provided a new BSONObj for the next invocation.
+ OpMsgBuilder builder;
+ builder.setBody(*nextInvocation);
+ exhaustMessage = builder.finish();
+ } else {
+ // Reuse the previous invocation for the next invocation.
+ OpMsg::removeChecksum(&requestMsg);
+ exhaustMessage = requestMsg;
+ }
+
+ // The id of the response is used as the request id of this 'synthetic' request. Re-checksum
+ // if needed.
+ exhaustMessage.header().setId(dbresponse->response.header().getId());
+ exhaustMessage.header().setResponseToMsgId(
+ dbresponse->response.header().getResponseToMsgId());
+ OpMsg::setFlag(&exhaustMessage, OpMsg::kExhaustSupported);
+ if (checksumPresent) {
+ OpMsg::appendChecksum(&exhaustMessage);
+ }
+
+ OpMsg::removeChecksum(&dbresponse->response);
+ // Indicate that the response is part of an exhaust stream. Re-checksum if needed.
+ OpMsg::setFlag(&dbresponse->response, OpMsg::kMoreToCome);
+ if (checksumPresent) {
+ OpMsg::appendChecksum(&dbresponse->response);
+ }
+
+ return exhaustMessage;
+ }
+
+ // TODO SERVER-44517: Everything below this line should go away, and we should return Message(),
+ // since the command did not set a next invocation.
+
// Only support exhaust for 'getMore' commands.
auto request = OpMsgRequest::parse(requestMsg);
if (request.getCommandName() != "getMore"_sd) {
@@ -125,7 +160,6 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
return Message();
}
- const bool checksumPresent = OpMsg::isFlagSet(requestMsg, OpMsg::kChecksumPresent);
OpMsg::removeChecksum(&dbresponse->response);
// Indicate that the response is part of an exhaust stream. Re-checksum if needed.
OpMsg::setFlag(&dbresponse->response, OpMsg::kMoreToCome);