diff options
-rw-r--r-- | src/mongo/rpc/op_msg_integration_test.cpp | 36 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_is_master_cmd.cpp | 54 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 2 |
3 files changed, 65 insertions, 27 deletions
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index 2009a609876..2e5cdad314b 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -526,16 +526,18 @@ TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) { auto fixtureConn = std::unique_ptr<DBClientBase>( unittest::getFixtureConnectionString().connect("integration_test", errMsg)); uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + DBClientBase* conn = fixtureConn.get(); // TODO SERVER-44813: Run this test on standalone. - // TODO SERVER-44521: Run this test on mongos. - if (!fixtureConn->isReplicaSetMember()) { + if (!fixtureConn->isReplicaSetMember() && !fixtureConn->isMongos()) { return; } - // Connect directly to the primary. - DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); - ASSERT(conn); + if (fixtureConn->isReplicaSetMember()) { + // Connect directly to the primary. + conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + } auto tickSource = getGlobalServiceContext()->getTickSource(); @@ -592,16 +594,18 @@ TEST(OpMsg, ServerHandlesExhaustIsMasterWithTopologyChange) { auto fixtureConn = std::unique_ptr<DBClientBase>( unittest::getFixtureConnectionString().connect("integration_test", errMsg)); uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + DBClientBase* conn = fixtureConn.get(); // TODO SERVER-44813: Run this test on standalone. - // TODO SERVER-44521: Run this test on mongos. - if (!fixtureConn->isReplicaSetMember()) { + if (!fixtureConn->isReplicaSetMember() && !fixtureConn->isMongos()) { return; } - // Connect directly to the primary. - DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); - ASSERT(conn); + if (fixtureConn->isReplicaSetMember()) { + // Connect directly to the primary. + conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + } auto tickSource = getGlobalServiceContext()->getTickSource(); @@ -661,16 +665,18 @@ TEST(OpMsg, ServerRejectsExhaustIsMasterWithoutMaxAwaitTimeMS) { auto fixtureConn = std::unique_ptr<DBClientBase>( unittest::getFixtureConnectionString().connect("integration_test", errMsg)); uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + DBClientBase* conn = fixtureConn.get(); // TODO SERVER-44813: Run this test on standalone. - // TODO SERVER-44521: Run this test on mongos. - if (!fixtureConn->isReplicaSetMember()) { + if (!fixtureConn->isReplicaSetMember() && !fixtureConn->isMongos()) { return; } - // Connect directly to the primary. - DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); - ASSERT(conn); + if (fixtureConn->isReplicaSetMember()) { + // Connect directly to the primary. + conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + } // Issue an isMaster command with exhaust but no maxAwaitTimeMS. auto isMasterCmd = BSON("isMaster" << 1); diff --git a/src/mongo/s/commands/cluster_is_master_cmd.cpp b/src/mongo/s/commands/cluster_is_master_cmd.cpp index ccd0d7d00d5..7432b5b6e33 100644 --- a/src/mongo/s/commands/cluster_is_master_cmd.cpp +++ b/src/mongo/s/commands/cluster_is_master_cmd.cpp @@ -60,9 +60,9 @@ MONGO_INITIALIZER(GenerateMongosTopologyVersion)(InitializerContext*) { namespace { -class CmdIsMaster : public BasicCommand { +class CmdIsMaster : public BasicCommandWithReplyBuilderInterface { public: - CmdIsMaster() : BasicCommand("isMaster", "ismaster") {} + CmdIsMaster() : BasicCommandWithReplyBuilderInterface("isMaster", "ismaster") {} bool supportsWriteConcern(const BSONObj& cmd) const override { return false; @@ -86,10 +86,10 @@ public: return false; } - bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { + bool runWithReplyBuilder(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + rpc::ReplyBuilderInterface* replyBuilder) final { CommandHelpers::handleMarkKillOnClientDisconnect(opCtx); waitInIsMaster.pauseWhileSet(opCtx); @@ -127,12 +127,13 @@ public: // present if and only if topologyVersion is present in the request. auto topologyVersionElement = cmdObj["topologyVersion"]; auto maxAwaitTimeMSField = cmdObj["maxAwaitTimeMS"]; + boost::optional<TopologyVersion> clientTopologyVersion; if (topologyVersionElement && maxAwaitTimeMSField) { - auto clientTopologyVersion = TopologyVersion::parse( - IDLParserErrorContext("TopologyVersion"), topologyVersionElement.Obj()); + clientTopologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"), + topologyVersionElement.Obj()); uassert(51758, "topologyVersion must have a non-negative counter", - clientTopologyVersion.getCounter() >= 0); + clientTopologyVersion->getCounter() >= 0); long long maxAwaitTimeMS; uassertStatusOK(bsonExtractIntegerField(cmdObj, "maxAwaitTimeMS", &maxAwaitTimeMS)); @@ -140,14 +141,14 @@ public: LOG(3) << "Using maxAwaitTimeMS for awaitable isMaster protocol."; - if (clientTopologyVersion.getProcessId() == mongosTopologyVersion.getProcessId()) { + if (clientTopologyVersion->getProcessId() == mongosTopologyVersion.getProcessId()) { uassert(51761, str::stream() << "Received a topology version with counter: " - << clientTopologyVersion.getCounter() + << clientTopologyVersion->getCounter() << " which is greater than the mongos topology version counter: " << mongosTopologyVersion.getCounter(), - clientTopologyVersion.getCounter() == mongosTopologyVersion.getCounter()); + clientTopologyVersion->getCounter() == mongosTopologyVersion.getCounter()); // The topologyVersion never changes on a running mongos process, so just sleep for // maxAwaitTimeMS. @@ -161,6 +162,7 @@ public: !topologyVersionElement && !maxAwaitTimeMSField); } + auto result = replyBuilder->getBodyBuilder(); result.appendBool("ismaster", true); result.append("msg", "isdbgrid"); result.appendNumber("maxBsonObjectSize", BSONObjMaxUserSize); @@ -190,6 +192,34 @@ public: BSONObjBuilder topologyVersionBuilder(result.subobjStart("topologyVersion")); mongosTopologyVersion.serialize(&topologyVersionBuilder); + if (opCtx->isExhaust()) { + LOG(3) << "Using exhaust for isMaster protocol"; + + uassert(51763, + "An isMaster request with exhaust must specify 'maxAwaitTimeMS'", + maxAwaitTimeMSField); + invariant(clientTopologyVersion); + + if (clientTopologyVersion->getProcessId() == mongosTopologyVersion.getProcessId() && + clientTopologyVersion->getCounter() == mongosTopologyVersion.getCounter()) { + // Indicate that an exhaust message should be generated and the previous BSONObj + // command parameters should be reused as the next BSONObj command parameters. + replyBuilder->setNextInvocation(boost::none); + } else { + BSONObjBuilder nextInvocationBuilder; + for (auto&& elt : cmdObj) { + if (elt.fieldNameStringData() == "topologyVersion"_sd) { + BSONObjBuilder topologyVersionBuilder( + nextInvocationBuilder.subobjStart("topologyVersion")); + mongosTopologyVersion.serialize(&topologyVersionBuilder); + } else { + nextInvocationBuilder.append(elt); + } + } + replyBuilder->setNextInvocation(nextInvocationBuilder.obj()); + } + } + return true; } diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index b138a8ae36e..eef03a45db3 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -849,6 +849,8 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) { } }(); + opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)); + // Execute. std::string db = request.getDatabase().toString(); try { |