summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2020-01-22 21:22:46 +0000
committerevergreen <evergreen@mongodb.com>2020-01-22 21:22:46 +0000
commit549e45237ae39be00ea7a298e36937ee8206020c (patch)
treeefba8b0639ccbcc01791405b6438bc14ecc7bb8e
parent02d4981e6e9d9fbdb89a76dcaf55b66205d04f09 (diff)
downloadmongo-549e45237ae39be00ea7a298e36937ee8206020c.tar.gz
SERVER-44521 Implement exhaust isMaster for mongos
-rw-r--r--src/mongo/rpc/op_msg_integration_test.cpp36
-rw-r--r--src/mongo/s/commands/cluster_is_master_cmd.cpp54
-rw-r--r--src/mongo/s/commands/strategy.cpp2
3 files changed, 65 insertions, 27 deletions
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp
index 2009a609876..2e5cdad314b 100644
--- a/src/mongo/rpc/op_msg_integration_test.cpp
+++ b/src/mongo/rpc/op_msg_integration_test.cpp
@@ -526,16 +526,18 @@ TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) {
auto fixtureConn = std::unique_ptr<DBClientBase>(
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+ DBClientBase* conn = fixtureConn.get();
// TODO SERVER-44813: Run this test on standalone.
- // TODO SERVER-44521: Run this test on mongos.
- if (!fixtureConn->isReplicaSetMember()) {
+ if (!fixtureConn->isReplicaSetMember() && !fixtureConn->isMongos()) {
return;
}
- // Connect directly to the primary.
- DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
- ASSERT(conn);
+ if (fixtureConn->isReplicaSetMember()) {
+ // Connect directly to the primary.
+ conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+ }
auto tickSource = getGlobalServiceContext()->getTickSource();
@@ -592,16 +594,18 @@ TEST(OpMsg, ServerHandlesExhaustIsMasterWithTopologyChange) {
auto fixtureConn = std::unique_ptr<DBClientBase>(
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+ DBClientBase* conn = fixtureConn.get();
// TODO SERVER-44813: Run this test on standalone.
- // TODO SERVER-44521: Run this test on mongos.
- if (!fixtureConn->isReplicaSetMember()) {
+ if (!fixtureConn->isReplicaSetMember() && !fixtureConn->isMongos()) {
return;
}
- // Connect directly to the primary.
- DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
- ASSERT(conn);
+ if (fixtureConn->isReplicaSetMember()) {
+ // Connect directly to the primary.
+ conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+ }
auto tickSource = getGlobalServiceContext()->getTickSource();
@@ -661,16 +665,18 @@ TEST(OpMsg, ServerRejectsExhaustIsMasterWithoutMaxAwaitTimeMS) {
auto fixtureConn = std::unique_ptr<DBClientBase>(
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+ DBClientBase* conn = fixtureConn.get();
// TODO SERVER-44813: Run this test on standalone.
- // TODO SERVER-44521: Run this test on mongos.
- if (!fixtureConn->isReplicaSetMember()) {
+ if (!fixtureConn->isReplicaSetMember() && !fixtureConn->isMongos()) {
return;
}
- // Connect directly to the primary.
- DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
- ASSERT(conn);
+ if (fixtureConn->isReplicaSetMember()) {
+ // Connect directly to the primary.
+ conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+ }
// Issue an isMaster command with exhaust but no maxAwaitTimeMS.
auto isMasterCmd = BSON("isMaster" << 1);
diff --git a/src/mongo/s/commands/cluster_is_master_cmd.cpp b/src/mongo/s/commands/cluster_is_master_cmd.cpp
index ccd0d7d00d5..7432b5b6e33 100644
--- a/src/mongo/s/commands/cluster_is_master_cmd.cpp
+++ b/src/mongo/s/commands/cluster_is_master_cmd.cpp
@@ -60,9 +60,9 @@ MONGO_INITIALIZER(GenerateMongosTopologyVersion)(InitializerContext*) {
namespace {
-class CmdIsMaster : public BasicCommand {
+class CmdIsMaster : public BasicCommandWithReplyBuilderInterface {
public:
- CmdIsMaster() : BasicCommand("isMaster", "ismaster") {}
+ CmdIsMaster() : BasicCommandWithReplyBuilderInterface("isMaster", "ismaster") {}
bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
@@ -86,10 +86,10 @@ public:
return false;
}
- bool run(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
+ bool runWithReplyBuilder(OperationContext* opCtx,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ rpc::ReplyBuilderInterface* replyBuilder) final {
CommandHelpers::handleMarkKillOnClientDisconnect(opCtx);
waitInIsMaster.pauseWhileSet(opCtx);
@@ -127,12 +127,13 @@ public:
// present if and only if topologyVersion is present in the request.
auto topologyVersionElement = cmdObj["topologyVersion"];
auto maxAwaitTimeMSField = cmdObj["maxAwaitTimeMS"];
+ boost::optional<TopologyVersion> clientTopologyVersion;
if (topologyVersionElement && maxAwaitTimeMSField) {
- auto clientTopologyVersion = TopologyVersion::parse(
- IDLParserErrorContext("TopologyVersion"), topologyVersionElement.Obj());
+ clientTopologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"),
+ topologyVersionElement.Obj());
uassert(51758,
"topologyVersion must have a non-negative counter",
- clientTopologyVersion.getCounter() >= 0);
+ clientTopologyVersion->getCounter() >= 0);
long long maxAwaitTimeMS;
uassertStatusOK(bsonExtractIntegerField(cmdObj, "maxAwaitTimeMS", &maxAwaitTimeMS));
@@ -140,14 +141,14 @@ public:
LOG(3) << "Using maxAwaitTimeMS for awaitable isMaster protocol.";
- if (clientTopologyVersion.getProcessId() == mongosTopologyVersion.getProcessId()) {
+ if (clientTopologyVersion->getProcessId() == mongosTopologyVersion.getProcessId()) {
uassert(51761,
str::stream()
<< "Received a topology version with counter: "
- << clientTopologyVersion.getCounter()
+ << clientTopologyVersion->getCounter()
<< " which is greater than the mongos topology version counter: "
<< mongosTopologyVersion.getCounter(),
- clientTopologyVersion.getCounter() == mongosTopologyVersion.getCounter());
+ clientTopologyVersion->getCounter() == mongosTopologyVersion.getCounter());
// The topologyVersion never changes on a running mongos process, so just sleep for
// maxAwaitTimeMS.
@@ -161,6 +162,7 @@ public:
!topologyVersionElement && !maxAwaitTimeMSField);
}
+ auto result = replyBuilder->getBodyBuilder();
result.appendBool("ismaster", true);
result.append("msg", "isdbgrid");
result.appendNumber("maxBsonObjectSize", BSONObjMaxUserSize);
@@ -190,6 +192,34 @@ public:
BSONObjBuilder topologyVersionBuilder(result.subobjStart("topologyVersion"));
mongosTopologyVersion.serialize(&topologyVersionBuilder);
+ if (opCtx->isExhaust()) {
+ LOG(3) << "Using exhaust for isMaster protocol";
+
+ uassert(51763,
+ "An isMaster request with exhaust must specify 'maxAwaitTimeMS'",
+ maxAwaitTimeMSField);
+ invariant(clientTopologyVersion);
+
+ if (clientTopologyVersion->getProcessId() == mongosTopologyVersion.getProcessId() &&
+ clientTopologyVersion->getCounter() == mongosTopologyVersion.getCounter()) {
+ // Indicate that an exhaust message should be generated and the previous BSONObj
+ // command parameters should be reused as the next BSONObj command parameters.
+ replyBuilder->setNextInvocation(boost::none);
+ } else {
+ BSONObjBuilder nextInvocationBuilder;
+ for (auto&& elt : cmdObj) {
+ if (elt.fieldNameStringData() == "topologyVersion"_sd) {
+ BSONObjBuilder topologyVersionBuilder(
+ nextInvocationBuilder.subobjStart("topologyVersion"));
+ mongosTopologyVersion.serialize(&topologyVersionBuilder);
+ } else {
+ nextInvocationBuilder.append(elt);
+ }
+ }
+ replyBuilder->setNextInvocation(nextInvocationBuilder.obj());
+ }
+ }
+
return true;
}
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index b138a8ae36e..eef03a45db3 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -849,6 +849,8 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
}
}();
+ opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported));
+
// Execute.
std::string db = request.getDatabase().toString();
try {