diff options
author | Mathias Stearn <mathias@10gen.com> | 2017-04-12 16:52:28 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2017-05-12 12:10:09 -0400 |
commit | bf96ba7825a92e2fb0137523c92bfa6a862abe68 (patch) | |
tree | e6ff0031075935100bec6ef3e80d98cd8594576f /src/mongo | |
parent | f2902d59175c0724944ca98d13f784e2de944053 (diff) | |
download | mongo-bf96ba7825a92e2fb0137523c92bfa6a862abe68.tar.gz |
SERVER-28816 OP_MSG for mongos ingress layer
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands.h | 4 | ||||
-rw-r--r-- | src/mongo/db/commands_helpers.h | 2 | ||||
-rw-r--r-- | src/mongo/rpc/protocol.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 193 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.h | 11 | ||||
-rw-r--r-- | src/mongo/s/service_entry_point_mongos.cpp | 14 |
6 files changed, 141 insertions, 84 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 780af5827cc..f169f651e6f 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -471,7 +471,8 @@ public: */ static bool isGenericArgument(StringData arg) { // Not including "help" since we don't pass help requests through to the command parser. - // If that changes, it should be added. + // If that changes, it should be added. When you add to this list, consider whether you + // should also change the filterCommandRequestForPassthrough() function in sharding. return arg == "$audit" || // arg == "$client" || // arg == "$configServerState" || // @@ -526,7 +527,6 @@ private: friend void mongo::execCommandClient(OperationContext* opCtx, Command* c, - int queryOptions, StringData dbname, BSONObj& cmdObj, BSONObjBuilder& result); diff --git a/src/mongo/db/commands_helpers.h b/src/mongo/db/commands_helpers.h index 5880c184cdf..93d278637fa 100644 --- a/src/mongo/db/commands_helpers.h +++ b/src/mongo/db/commands_helpers.h @@ -33,6 +33,7 @@ class OperationContext; class Command; class BSONObj; class BSONObjBuilder; +class StringData; namespace rpc { class RequestInterface; @@ -47,7 +48,6 @@ class ReplyBuilderInterface; // Implemented in `src/mongo/s/s_only.cpp`. void execCommandClient(OperationContext* opCtx, Command* c, - int queryOptions, StringData dbname, BSONObj& cmdObj, BSONObjBuilder& result); diff --git a/src/mongo/rpc/protocol.cpp b/src/mongo/rpc/protocol.cpp index 5799b0bcc7d..02d54c5db55 100644 --- a/src/mongo/rpc/protocol.cpp +++ b/src/mongo/rpc/protocol.cpp @@ -154,7 +154,6 @@ StatusWith<ProtocolSetAndWireVersionInfo> parseProtocolSetFromIsMasterReply( if (isMongos) { // Remove support for protocols that mongos doesn't support. protos &= ~supports::kOpCommandOnly; - protos &= ~supports::kOpMsgOnly; // TODO remove this line once mongos supports OP_MSG. } return {{protos, version}}; diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 11cd2ef1381..d2251b6b015 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -71,6 +71,7 @@ #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/op_msg.h" #include "mongo/util/scopeguard.h" #include "mongo/util/timer.h" @@ -87,18 +88,11 @@ namespace { const std::string kOperationTime = "operationTime"; void runAgainstRegistered(OperationContext* opCtx, - const NamespaceString& nss, + StringData db, BSONObj& jsobj, - BSONObjBuilder& anObjBuilder, - int queryOptions) { - // It should be impossible for this uassert to fail since there should be no way to get - // into this function with any other collection name. - uassert(16618, - "Illegal attempt to run a command against a namespace other than $cmd.", - nss.isCommand()); - + BSONObjBuilder& anObjBuilder) { BSONElement e = jsobj.firstElement(); - std::string commandName = e.fieldName(); + const auto commandName = e.fieldNameStringData(); Command* c = e.type() ? Command::findCommand(commandName) : NULL; if (!c) { Command::appendCommandStatus( @@ -108,7 +102,7 @@ void runAgainstRegistered(OperationContext* opCtx, return; } - execCommandClient(opCtx, c, queryOptions, nss.db(), jsobj, anObjBuilder); + execCommandClient(opCtx, c, db, jsobj, anObjBuilder); } /** @@ -125,13 +119,8 @@ void execCommandHandler(OperationContext* opCtx, std::tie(cmdObj, queryFlags) = rpc::downconvertRequestMetadata(request.getCommandArgs(), request.getMetadata()); - std::string db = request.getDatabase().toString(); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid database name: '" << db << "'", - NamespaceString::validDBName(db, NamespaceString::DollarInDbNameBehavior::Allow)); - BSONObjBuilder result; - execCommandClient(opCtx, command, queryFlags, db, cmdObj, result); + execCommandClient(opCtx, command, request.getDatabase(), cmdObj, result); replyBuilder->setCommandReply(result.done()).setMetadata(rpc::makeEmptyMetadata()); } @@ -140,6 +129,9 @@ void execCommandHandler(OperationContext* opCtx, * Extract and process metadata from the command request body. */ Status processCommandMetadata(OperationContext* opCtx, const BSONObj& cmdObj) { + ReadPreferenceSetting::get(opCtx) = + uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(cmdObj)); + auto logicalClock = LogicalClock::get(opCtx); invariant(logicalClock); @@ -283,12 +275,17 @@ DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss cursorId.getValue())}; } -DbResponse Strategy::clientCommandOp(OperationContext* opCtx, - const NamespaceString& nss, - DbMessage* dbm) { +static void runCommand(OperationContext* opCtx, + StringData db, + BSONObj cmdObj, + BSONObjBuilder&& builder); + +DbResponse Strategy::clientOpQueryCommand(OperationContext* opCtx, + NamespaceString nss, + DbMessage* dbm) { const QueryMessage q(*dbm); - LOG(3) << "command: " << q.ns << " " << redact(q.query) << " ntoreturn: " << q.ntoreturn + LOG(3) << "command: " << nss << " " << redact(q.query) << " ntoreturn: " << q.ntoreturn << " options: " << q.queryOptions; if (q.queryOptions & QueryOption_Exhaust) { @@ -304,36 +301,32 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx, << ") for $cmd type ns - can only be 1 or -1", q.ntoreturn == 1 || q.ntoreturn == -1); + BSONObj cmdObj = q.query; + // Handle the $cmd.sys pseudo-commands if (nss.isSpecialCommand()) { const auto upgradeToRealCommand = [&](StringData commandName) { BSONObjBuilder cmdBob; cmdBob.append(commandName, 1); - cmdBob.appendElements(q.query); // fields are validated by Commands - auto interposedCmd = cmdBob.done(); - - // Rewrite upgraded pseudoCommands to run on the 'admin' database. - const NamespaceString interposedNss("admin", "$cmd"); - BSONObjBuilder reply; - runAgainstRegistered(opCtx, interposedNss, interposedCmd, reply, q.queryOptions); - return replyToQuery(reply.done()); + cmdBob.appendElements(cmdObj); // fields are validated by Commands + return cmdBob.obj(); }; if (nss.coll() == "$cmd.sys.inprog") { - return upgradeToRealCommand("currentOp"); + cmdObj = upgradeToRealCommand("currentOp"); } else if (nss.coll() == "$cmd.sys.killop") { - return upgradeToRealCommand("killOp"); + cmdObj = upgradeToRealCommand("killOp"); } else if (nss.coll() == "$cmd.sys.unlock") { - return replyToQuery(BSON("$err" - << "can't do unlock through mongos"), - ResultFlag_ErrSet); + uasserted(40442, "can't do unlock through mongos"); + } else { + uasserted(40443, str::stream() << "unknown psuedo-command namespace " << nss.ns()); } - // No pseudo-command found, fall through to execute as a regular query + // These commands must be run against the admin db even though the psuedo commands + // ignored the db. + nss = NamespaceString("admin", "$cmd"); } - BSONObj cmdObj = q.query; - { bool haveReadPref = false; BSONElement e = cmdObj.firstElement(); @@ -342,19 +335,11 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx, // Extract the embedded query object. if (auto readPrefElem = cmdObj[Query::ReadPrefField.name()]) { // The command has a read preference setting. We don't want to lose this information - // so we put it on the OperationContext and copy it to a new field called - // $queryOptions.$readPreference - ReadPreferenceSetting::get(opCtx) = - uassertStatusOK(ReadPreferenceSetting::fromInnerBSON(readPrefElem)); - + // so we copy it to a new field called $queryOptions.$readPreference haveReadPref = true; BSONObjBuilder finalCmdObjBuilder; finalCmdObjBuilder.appendElements(e.embeddedObject()); - - BSONObjBuilder queryOptionsBuilder(finalCmdObjBuilder.subobjStart("$queryOptions")); - queryOptionsBuilder.append(readPrefElem); - queryOptionsBuilder.done(); - + finalCmdObjBuilder.append(readPrefElem); cmdObj = finalCmdObjBuilder.obj(); } else { cmdObj = e.embeddedObject(); @@ -365,19 +350,22 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx, // If the slaveOK bit is set, behave as-if read preference secondary-preferred was // specified. const auto readPref = ReadPreferenceSetting(ReadPreference::SecondaryPreferred); - ReadPreferenceSetting::get(opCtx) = readPref; - BSONObjBuilder finalCmdObjBuilder; finalCmdObjBuilder.appendElements(cmdObj); - - BSONObjBuilder queryOptionsBuilder(finalCmdObjBuilder.subobjStart("$queryOptions")); - readPref.toContainingBSON(&queryOptionsBuilder); - queryOptionsBuilder.doneFast(); - + readPref.toContainingBSON(&finalCmdObjBuilder); cmdObj = finalCmdObjBuilder.obj(); } } + OpQueryReplyBuilder reply; + runCommand(opCtx, nss.db(), cmdObj, BSONObjBuilder(reply.bufBuilderForResults())); + return DbResponse{reply.toCommandReply()}; +} + +static void runCommand(OperationContext* opCtx, + StringData db, + BSONObj cmdObj, + BSONObjBuilder&& builder) { // Handle command option maxTimeMS. uassert(ErrorCodes::InvalidOptions, "no such command option $maxTimeMs; use maxTimeMS instead", @@ -392,23 +380,21 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx, int loops = 5; while (true) { + builder.resetToEmpty(); try { - OpQueryReplyBuilder reply; - { - BSONObjBuilder builder(reply.bufBuilderForResults()); - runAgainstRegistered(opCtx, NamespaceString(q.ns), cmdObj, builder, q.queryOptions); - } - return DbResponse{reply.toCommandReply()}; + runAgainstRegistered(opCtx, db, cmdObj, builder); + return; } catch (const StaleConfigException& e) { if (loops <= 0) throw e; loops--; - log() << "Retrying command " << redact(q.query) << causedBy(e); + log() << "Retrying command " << redact(cmdObj) << causedBy(e); // For legacy reasons, ns may not actually be set in the exception - const std::string staleNS(e.getns().empty() ? std::string(q.ns) : e.getns()); + const std::string staleNS(e.getns().empty() ? NamespaceString(db).getCommandNS().ns() + : e.getns()); ShardConnection::checkMyConnectionVersions(opCtx, staleNS); if (loops < 4) { @@ -417,17 +403,36 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx, Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNSS); } } + continue; } catch (const DBException& e) { - OpQueryReplyBuilder reply; - { - BSONObjBuilder builder(reply.bufBuilderForResults()); - Command::appendCommandStatus(builder, e.toStatus()); - } - return DbResponse{reply.toCommandReply()}; + builder.resetToEmpty(); + Command::appendCommandStatus(builder, e.toStatus()); + return; } + MONGO_UNREACHABLE; } } +DbResponse Strategy::clientOpMsgCommand(OperationContext* opCtx, const Message& m) { + auto request = OpMsg::parse(m); + OpMsgBuilder reply; + try { + std::string db = "admin"; + if (auto elem = request.body["$db"]) + db = elem.String(); + + runCommand(opCtx, db, request.body, reply.beginBody()); + } catch (const DBException& ex) { + reply.reset(); + auto bob = reply.beginBody(); + Command::appendCommandStatus(bob, ex.toStatus()); + } + + if (request.isFlagSet(OpMsg::kMoreToCome)) + return {}; + return DbResponse{reply.finish()}; +} + void Strategy::commandOp(OperationContext* opCtx, const string& db, const BSONObj& command, @@ -580,12 +585,11 @@ void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) { // Adjust namespace for command const NamespaceString& fullNS(commandRequest->getNS()); - const NamespaceString& cmdNS = fullNS.getCommandNS(); BSONObj commandBSON = commandRequest->toBSON(); BSONObjBuilder builder; - runAgainstRegistered(opCtx, cmdNS, commandBSON, builder, 0); + runAgainstRegistered(opCtx, fullNS.db(), commandBSON, builder); bool parsed = commandResponse.parseBSON(builder.done(), nullptr); (void)parsed; // for compile @@ -651,18 +655,60 @@ Status Strategy::explainFind(OperationContext* opCtx, out); } + +namespace { +/** + * Rewrites cmdObj into the format expected by mongos Command::run() implementations. + * + * This performs 2 transformations: + * 1) $readPreference fields are moved into a subobject called $queryOptions. This matches the + * "wrapped" format historically used internally by mongos. Moving off of that format will be + * done as SERVER-29091. + * + * 2) Filter out generic arguments that shouldn't be blindly passed to the shards. This is + * necessary because many mongos implementations of Command::run() just pass cmdObj through + * directly to the shards. However, some of the generic arguments fields are automatically + * appended in the egress layer. Removing them here ensures that they don't get duplicated. + * + * Ideally this function can be deleted once mongos run() implementations are more careful about + * what they send to the shards. + */ +BSONObj filterCommandRequestForPassthrough(const BSONObj& cmdObj) { + BSONObjBuilder bob; + for (auto elem : cmdObj) { + const auto name = elem.fieldNameStringData(); + if (name == "$readPreference") { + BSONObjBuilder(bob.subobjStart("$queryOptions")).append(elem); + } else if (!Command::isGenericArgument(name) || name == "maxTimeMS" || + name == "readConcern" || name == "writeConcern") { + // This is the whitelist of generic arguments that commands can be trusted to blindly + // forward to the shards. + bob.append(elem); + } + } + return bob.obj(); +} +} + /** * Called into by the commands infrastructure. */ void execCommandClient(OperationContext* opCtx, Command* c, - int queryOptions, StringData dbname, BSONObj& cmdObj, BSONObjBuilder& result) { ON_BLOCK_EXIT([opCtx, &result] { appendRequiredFieldsToResponse(opCtx, &result); }); + uassert(ErrorCodes::IllegalOperation, + "Can't use 'local' database through mongos", + dbname != NamespaceString::kLocalDb); + + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid database name: '" << dbname << "'", + NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); + dassert(dbname == nsToDatabase(dbname)); StringMap<int> topLevelFields; for (auto&& element : cmdObj) { @@ -722,18 +768,19 @@ void execCommandClient(OperationContext* opCtx, return; } + auto filteredCmdObj = filterCommandRequestForPassthrough(cmdObj); std::string errmsg; bool ok = false; try { if (!supportsWriteConcern) { - ok = c->run(opCtx, dbname.toString(), cmdObj, errmsg, result); + ok = c->run(opCtx, dbname.toString(), filteredCmdObj, errmsg, result); } else { // Change the write concern while running the command. const auto oldWC = opCtx->getWriteConcern(); ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); opCtx->setWriteConcern(wcResult.getValue()); - ok = c->run(opCtx, dbname.toString(), cmdObj, errmsg, result); + ok = c->run(opCtx, dbname.toString(), filteredCmdObj, errmsg, result); } } catch (const DBException& e) { result.resetToEmpty(); diff --git a/src/mongo/s/commands/strategy.h b/src/mongo/s/commands/strategy.h index 9da42bae611..b64551fa0d9 100644 --- a/src/mongo/s/commands/strategy.h +++ b/src/mongo/s/commands/strategy.h @@ -38,6 +38,7 @@ namespace mongo { class DbMessage; struct DbResponse; +class Message; class NamespaceString; class OperationContext; class QueryRequest; @@ -75,15 +76,15 @@ public: static void writeOp(OperationContext* opCtx, DbMessage* dbm); /** - * Executes a legacy-style ($cmd namespace) command. Does not throw and returns the response - * regardless of success or error. + * Executes a command from either OP_QUERY or OP_MSG wire protocols. * * Catches StaleConfigException errors and retries the command automatically after refreshing * the metadata for the failing namespace. */ - static DbResponse clientCommandOp(OperationContext* opCtx, - const NamespaceString& nss, - DbMessage* dbm); + static DbResponse clientOpMsgCommand(OperationContext* opCtx, const Message& message); + static DbResponse clientOpQueryCommand(OperationContext* opCtx, + NamespaceString nss, + DbMessage* dbm); /** * Helper to run an explain of a find operation on the shards. Fills 'out' with the result of diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp index 118296a2690..deb19862dfc 100644 --- a/src/mongo/s/service_entry_point_mongos.cpp +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -33,6 +33,7 @@ #include "mongo/s/service_entry_point_mongos.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" #include "mongo/db/dbmessage.h" #include "mongo/db/lasterror.h" #include "mongo/db/operation_context.h" @@ -74,7 +75,7 @@ DbResponse ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, // connection uassert(ErrorCodes::IllegalOperation, str::stream() << "Message type " << op << " is not supported.", - isSupportedNetworkOp(op)); + isSupportedNetworkOp(op) && op != dbCommand && op != dbCommandReply); // Start a new LastError session. Any exceptions thrown from here onwards will be returned // to the caller (if the type of the message permits it). @@ -108,9 +109,18 @@ DbResponse ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, << " op: " << networkOpToString(op); switch (op) { + case dbMsg: + dbResponse = Strategy::clientOpMsgCommand(opCtx, message); + break; case dbQuery: if (nss.isCommand() || nss.isSpecialCommand()) { - dbResponse = Strategy::clientCommandOp(opCtx, nss, &dbm); + try { + dbResponse = Strategy::clientOpQueryCommand(opCtx, nss, &dbm); + } catch (const DBException& ex) { + BSONObjBuilder bob; + Command::appendCommandStatus(bob, ex.toStatus()); + dbResponse = replyToQuery(bob.done()); + } } else { dbResponse = Strategy::queryOp(opCtx, nss, &dbm); } |