summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2017-04-12 16:52:28 -0400
committerMathias Stearn <mathias@10gen.com>2017-05-12 12:10:09 -0400
commitbf96ba7825a92e2fb0137523c92bfa6a862abe68 (patch)
treee6ff0031075935100bec6ef3e80d98cd8594576f /src/mongo
parentf2902d59175c0724944ca98d13f784e2de944053 (diff)
downloadmongo-bf96ba7825a92e2fb0137523c92bfa6a862abe68.tar.gz
SERVER-28816 OP_MSG for mongos ingress layer
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands.h4
-rw-r--r--src/mongo/db/commands_helpers.h2
-rw-r--r--src/mongo/rpc/protocol.cpp1
-rw-r--r--src/mongo/s/commands/strategy.cpp193
-rw-r--r--src/mongo/s/commands/strategy.h11
-rw-r--r--src/mongo/s/service_entry_point_mongos.cpp14
6 files changed, 141 insertions, 84 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index 780af5827cc..f169f651e6f 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -471,7 +471,8 @@ public:
*/
static bool isGenericArgument(StringData arg) {
// Not including "help" since we don't pass help requests through to the command parser.
- // If that changes, it should be added.
+ // If that changes, it should be added. When you add to this list, consider whether you
+ // should also change the filterCommandRequestForPassthrough() function in sharding.
return arg == "$audit" || //
arg == "$client" || //
arg == "$configServerState" || //
@@ -526,7 +527,6 @@ private:
friend void mongo::execCommandClient(OperationContext* opCtx,
Command* c,
- int queryOptions,
StringData dbname,
BSONObj& cmdObj,
BSONObjBuilder& result);
diff --git a/src/mongo/db/commands_helpers.h b/src/mongo/db/commands_helpers.h
index 5880c184cdf..93d278637fa 100644
--- a/src/mongo/db/commands_helpers.h
+++ b/src/mongo/db/commands_helpers.h
@@ -33,6 +33,7 @@ class OperationContext;
class Command;
class BSONObj;
class BSONObjBuilder;
+class StringData;
namespace rpc {
class RequestInterface;
@@ -47,7 +48,6 @@ class ReplyBuilderInterface;
// Implemented in `src/mongo/s/s_only.cpp`.
void execCommandClient(OperationContext* opCtx,
Command* c,
- int queryOptions,
StringData dbname,
BSONObj& cmdObj,
BSONObjBuilder& result);
diff --git a/src/mongo/rpc/protocol.cpp b/src/mongo/rpc/protocol.cpp
index 5799b0bcc7d..02d54c5db55 100644
--- a/src/mongo/rpc/protocol.cpp
+++ b/src/mongo/rpc/protocol.cpp
@@ -154,7 +154,6 @@ StatusWith<ProtocolSetAndWireVersionInfo> parseProtocolSetFromIsMasterReply(
if (isMongos) {
// Remove support for protocols that mongos doesn't support.
protos &= ~supports::kOpCommandOnly;
- protos &= ~supports::kOpMsgOnly; // TODO remove this line once mongos supports OP_MSG.
}
return {{protos, version}};
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 11cd2ef1381..d2251b6b015 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -71,6 +71,7 @@
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/net/op_msg.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/timer.h"
@@ -87,18 +88,11 @@ namespace {
const std::string kOperationTime = "operationTime";
void runAgainstRegistered(OperationContext* opCtx,
- const NamespaceString& nss,
+ StringData db,
BSONObj& jsobj,
- BSONObjBuilder& anObjBuilder,
- int queryOptions) {
- // It should be impossible for this uassert to fail since there should be no way to get
- // into this function with any other collection name.
- uassert(16618,
- "Illegal attempt to run a command against a namespace other than $cmd.",
- nss.isCommand());
-
+ BSONObjBuilder& anObjBuilder) {
BSONElement e = jsobj.firstElement();
- std::string commandName = e.fieldName();
+ const auto commandName = e.fieldNameStringData();
Command* c = e.type() ? Command::findCommand(commandName) : NULL;
if (!c) {
Command::appendCommandStatus(
@@ -108,7 +102,7 @@ void runAgainstRegistered(OperationContext* opCtx,
return;
}
- execCommandClient(opCtx, c, queryOptions, nss.db(), jsobj, anObjBuilder);
+ execCommandClient(opCtx, c, db, jsobj, anObjBuilder);
}
/**
@@ -125,13 +119,8 @@ void execCommandHandler(OperationContext* opCtx,
std::tie(cmdObj, queryFlags) =
rpc::downconvertRequestMetadata(request.getCommandArgs(), request.getMetadata());
- std::string db = request.getDatabase().toString();
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid database name: '" << db << "'",
- NamespaceString::validDBName(db, NamespaceString::DollarInDbNameBehavior::Allow));
-
BSONObjBuilder result;
- execCommandClient(opCtx, command, queryFlags, db, cmdObj, result);
+ execCommandClient(opCtx, command, request.getDatabase(), cmdObj, result);
replyBuilder->setCommandReply(result.done()).setMetadata(rpc::makeEmptyMetadata());
}
@@ -140,6 +129,9 @@ void execCommandHandler(OperationContext* opCtx,
* Extract and process metadata from the command request body.
*/
Status processCommandMetadata(OperationContext* opCtx, const BSONObj& cmdObj) {
+ ReadPreferenceSetting::get(opCtx) =
+ uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(cmdObj));
+
auto logicalClock = LogicalClock::get(opCtx);
invariant(logicalClock);
@@ -283,12 +275,17 @@ DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss
cursorId.getValue())};
}
-DbResponse Strategy::clientCommandOp(OperationContext* opCtx,
- const NamespaceString& nss,
- DbMessage* dbm) {
+static void runCommand(OperationContext* opCtx,
+ StringData db,
+ BSONObj cmdObj,
+ BSONObjBuilder&& builder);
+
+DbResponse Strategy::clientOpQueryCommand(OperationContext* opCtx,
+ NamespaceString nss,
+ DbMessage* dbm) {
const QueryMessage q(*dbm);
- LOG(3) << "command: " << q.ns << " " << redact(q.query) << " ntoreturn: " << q.ntoreturn
+ LOG(3) << "command: " << nss << " " << redact(q.query) << " ntoreturn: " << q.ntoreturn
<< " options: " << q.queryOptions;
if (q.queryOptions & QueryOption_Exhaust) {
@@ -304,36 +301,32 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx,
<< ") for $cmd type ns - can only be 1 or -1",
q.ntoreturn == 1 || q.ntoreturn == -1);
+ BSONObj cmdObj = q.query;
+
// Handle the $cmd.sys pseudo-commands
if (nss.isSpecialCommand()) {
const auto upgradeToRealCommand = [&](StringData commandName) {
BSONObjBuilder cmdBob;
cmdBob.append(commandName, 1);
- cmdBob.appendElements(q.query); // fields are validated by Commands
- auto interposedCmd = cmdBob.done();
-
- // Rewrite upgraded pseudoCommands to run on the 'admin' database.
- const NamespaceString interposedNss("admin", "$cmd");
- BSONObjBuilder reply;
- runAgainstRegistered(opCtx, interposedNss, interposedCmd, reply, q.queryOptions);
- return replyToQuery(reply.done());
+ cmdBob.appendElements(cmdObj); // fields are validated by Commands
+ return cmdBob.obj();
};
if (nss.coll() == "$cmd.sys.inprog") {
- return upgradeToRealCommand("currentOp");
+ cmdObj = upgradeToRealCommand("currentOp");
} else if (nss.coll() == "$cmd.sys.killop") {
- return upgradeToRealCommand("killOp");
+ cmdObj = upgradeToRealCommand("killOp");
} else if (nss.coll() == "$cmd.sys.unlock") {
- return replyToQuery(BSON("$err"
- << "can't do unlock through mongos"),
- ResultFlag_ErrSet);
+ uasserted(40442, "can't do unlock through mongos");
+ } else {
+ uasserted(40443, str::stream() << "unknown psuedo-command namespace " << nss.ns());
}
- // No pseudo-command found, fall through to execute as a regular query
+ // These commands must be run against the admin db even though the psuedo commands
+ // ignored the db.
+ nss = NamespaceString("admin", "$cmd");
}
- BSONObj cmdObj = q.query;
-
{
bool haveReadPref = false;
BSONElement e = cmdObj.firstElement();
@@ -342,19 +335,11 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx,
// Extract the embedded query object.
if (auto readPrefElem = cmdObj[Query::ReadPrefField.name()]) {
// The command has a read preference setting. We don't want to lose this information
- // so we put it on the OperationContext and copy it to a new field called
- // $queryOptions.$readPreference
- ReadPreferenceSetting::get(opCtx) =
- uassertStatusOK(ReadPreferenceSetting::fromInnerBSON(readPrefElem));
-
+ // so we copy it to a new field called $queryOptions.$readPreference
haveReadPref = true;
BSONObjBuilder finalCmdObjBuilder;
finalCmdObjBuilder.appendElements(e.embeddedObject());
-
- BSONObjBuilder queryOptionsBuilder(finalCmdObjBuilder.subobjStart("$queryOptions"));
- queryOptionsBuilder.append(readPrefElem);
- queryOptionsBuilder.done();
-
+ finalCmdObjBuilder.append(readPrefElem);
cmdObj = finalCmdObjBuilder.obj();
} else {
cmdObj = e.embeddedObject();
@@ -365,19 +350,22 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx,
// If the slaveOK bit is set, behave as-if read preference secondary-preferred was
// specified.
const auto readPref = ReadPreferenceSetting(ReadPreference::SecondaryPreferred);
- ReadPreferenceSetting::get(opCtx) = readPref;
-
BSONObjBuilder finalCmdObjBuilder;
finalCmdObjBuilder.appendElements(cmdObj);
-
- BSONObjBuilder queryOptionsBuilder(finalCmdObjBuilder.subobjStart("$queryOptions"));
- readPref.toContainingBSON(&queryOptionsBuilder);
- queryOptionsBuilder.doneFast();
-
+ readPref.toContainingBSON(&finalCmdObjBuilder);
cmdObj = finalCmdObjBuilder.obj();
}
}
+ OpQueryReplyBuilder reply;
+ runCommand(opCtx, nss.db(), cmdObj, BSONObjBuilder(reply.bufBuilderForResults()));
+ return DbResponse{reply.toCommandReply()};
+}
+
+static void runCommand(OperationContext* opCtx,
+ StringData db,
+ BSONObj cmdObj,
+ BSONObjBuilder&& builder) {
// Handle command option maxTimeMS.
uassert(ErrorCodes::InvalidOptions,
"no such command option $maxTimeMs; use maxTimeMS instead",
@@ -392,23 +380,21 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx,
int loops = 5;
while (true) {
+ builder.resetToEmpty();
try {
- OpQueryReplyBuilder reply;
- {
- BSONObjBuilder builder(reply.bufBuilderForResults());
- runAgainstRegistered(opCtx, NamespaceString(q.ns), cmdObj, builder, q.queryOptions);
- }
- return DbResponse{reply.toCommandReply()};
+ runAgainstRegistered(opCtx, db, cmdObj, builder);
+ return;
} catch (const StaleConfigException& e) {
if (loops <= 0)
throw e;
loops--;
- log() << "Retrying command " << redact(q.query) << causedBy(e);
+ log() << "Retrying command " << redact(cmdObj) << causedBy(e);
// For legacy reasons, ns may not actually be set in the exception
- const std::string staleNS(e.getns().empty() ? std::string(q.ns) : e.getns());
+ const std::string staleNS(e.getns().empty() ? NamespaceString(db).getCommandNS().ns()
+ : e.getns());
ShardConnection::checkMyConnectionVersions(opCtx, staleNS);
if (loops < 4) {
@@ -417,17 +403,36 @@ DbResponse Strategy::clientCommandOp(OperationContext* opCtx,
Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNSS);
}
}
+ continue;
} catch (const DBException& e) {
- OpQueryReplyBuilder reply;
- {
- BSONObjBuilder builder(reply.bufBuilderForResults());
- Command::appendCommandStatus(builder, e.toStatus());
- }
- return DbResponse{reply.toCommandReply()};
+ builder.resetToEmpty();
+ Command::appendCommandStatus(builder, e.toStatus());
+ return;
}
+ MONGO_UNREACHABLE;
}
}
+DbResponse Strategy::clientOpMsgCommand(OperationContext* opCtx, const Message& m) {
+ auto request = OpMsg::parse(m);
+ OpMsgBuilder reply;
+ try {
+ std::string db = "admin";
+ if (auto elem = request.body["$db"])
+ db = elem.String();
+
+ runCommand(opCtx, db, request.body, reply.beginBody());
+ } catch (const DBException& ex) {
+ reply.reset();
+ auto bob = reply.beginBody();
+ Command::appendCommandStatus(bob, ex.toStatus());
+ }
+
+ if (request.isFlagSet(OpMsg::kMoreToCome))
+ return {};
+ return DbResponse{reply.finish()};
+}
+
void Strategy::commandOp(OperationContext* opCtx,
const string& db,
const BSONObj& command,
@@ -580,12 +585,11 @@ void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) {
// Adjust namespace for command
const NamespaceString& fullNS(commandRequest->getNS());
- const NamespaceString& cmdNS = fullNS.getCommandNS();
BSONObj commandBSON = commandRequest->toBSON();
BSONObjBuilder builder;
- runAgainstRegistered(opCtx, cmdNS, commandBSON, builder, 0);
+ runAgainstRegistered(opCtx, fullNS.db(), commandBSON, builder);
bool parsed = commandResponse.parseBSON(builder.done(), nullptr);
(void)parsed; // for compile
@@ -651,18 +655,60 @@ Status Strategy::explainFind(OperationContext* opCtx,
out);
}
+
+namespace {
+/**
+ * Rewrites cmdObj into the format expected by mongos Command::run() implementations.
+ *
+ * This performs 2 transformations:
+ * 1) $readPreference fields are moved into a subobject called $queryOptions. This matches the
+ * "wrapped" format historically used internally by mongos. Moving off of that format will be
+ * done as SERVER-29091.
+ *
+ * 2) Filter out generic arguments that shouldn't be blindly passed to the shards. This is
+ * necessary because many mongos implementations of Command::run() just pass cmdObj through
+ * directly to the shards. However, some of the generic arguments fields are automatically
+ * appended in the egress layer. Removing them here ensures that they don't get duplicated.
+ *
+ * Ideally this function can be deleted once mongos run() implementations are more careful about
+ * what they send to the shards.
+ */
+BSONObj filterCommandRequestForPassthrough(const BSONObj& cmdObj) {
+ BSONObjBuilder bob;
+ for (auto elem : cmdObj) {
+ const auto name = elem.fieldNameStringData();
+ if (name == "$readPreference") {
+ BSONObjBuilder(bob.subobjStart("$queryOptions")).append(elem);
+ } else if (!Command::isGenericArgument(name) || name == "maxTimeMS" ||
+ name == "readConcern" || name == "writeConcern") {
+ // This is the whitelist of generic arguments that commands can be trusted to blindly
+ // forward to the shards.
+ bob.append(elem);
+ }
+ }
+ return bob.obj();
+}
+}
+
/**
* Called into by the commands infrastructure.
*/
void execCommandClient(OperationContext* opCtx,
Command* c,
- int queryOptions,
StringData dbname,
BSONObj& cmdObj,
BSONObjBuilder& result) {
ON_BLOCK_EXIT([opCtx, &result] { appendRequiredFieldsToResponse(opCtx, &result); });
+ uassert(ErrorCodes::IllegalOperation,
+ "Can't use 'local' database through mongos",
+ dbname != NamespaceString::kLocalDb);
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid database name: '" << dbname << "'",
+ NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
+
dassert(dbname == nsToDatabase(dbname));
StringMap<int> topLevelFields;
for (auto&& element : cmdObj) {
@@ -722,18 +768,19 @@ void execCommandClient(OperationContext* opCtx,
return;
}
+ auto filteredCmdObj = filterCommandRequestForPassthrough(cmdObj);
std::string errmsg;
bool ok = false;
try {
if (!supportsWriteConcern) {
- ok = c->run(opCtx, dbname.toString(), cmdObj, errmsg, result);
+ ok = c->run(opCtx, dbname.toString(), filteredCmdObj, errmsg, result);
} else {
// Change the write concern while running the command.
const auto oldWC = opCtx->getWriteConcern();
ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });
opCtx->setWriteConcern(wcResult.getValue());
- ok = c->run(opCtx, dbname.toString(), cmdObj, errmsg, result);
+ ok = c->run(opCtx, dbname.toString(), filteredCmdObj, errmsg, result);
}
} catch (const DBException& e) {
result.resetToEmpty();
diff --git a/src/mongo/s/commands/strategy.h b/src/mongo/s/commands/strategy.h
index 9da42bae611..b64551fa0d9 100644
--- a/src/mongo/s/commands/strategy.h
+++ b/src/mongo/s/commands/strategy.h
@@ -38,6 +38,7 @@ namespace mongo {
class DbMessage;
struct DbResponse;
+class Message;
class NamespaceString;
class OperationContext;
class QueryRequest;
@@ -75,15 +76,15 @@ public:
static void writeOp(OperationContext* opCtx, DbMessage* dbm);
/**
- * Executes a legacy-style ($cmd namespace) command. Does not throw and returns the response
- * regardless of success or error.
+ * Executes a command from either OP_QUERY or OP_MSG wire protocols.
*
* Catches StaleConfigException errors and retries the command automatically after refreshing
* the metadata for the failing namespace.
*/
- static DbResponse clientCommandOp(OperationContext* opCtx,
- const NamespaceString& nss,
- DbMessage* dbm);
+ static DbResponse clientOpMsgCommand(OperationContext* opCtx, const Message& message);
+ static DbResponse clientOpQueryCommand(OperationContext* opCtx,
+ NamespaceString nss,
+ DbMessage* dbm);
/**
* Helper to run an explain of a find operation on the shards. Fills 'out' with the result of
diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp
index 118296a2690..deb19862dfc 100644
--- a/src/mongo/s/service_entry_point_mongos.cpp
+++ b/src/mongo/s/service_entry_point_mongos.cpp
@@ -33,6 +33,7 @@
#include "mongo/s/service_entry_point_mongos.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/operation_context.h"
@@ -74,7 +75,7 @@ DbResponse ServiceEntryPointMongos::handleRequest(OperationContext* opCtx,
// connection
uassert(ErrorCodes::IllegalOperation,
str::stream() << "Message type " << op << " is not supported.",
- isSupportedNetworkOp(op));
+ isSupportedNetworkOp(op) && op != dbCommand && op != dbCommandReply);
// Start a new LastError session. Any exceptions thrown from here onwards will be returned
// to the caller (if the type of the message permits it).
@@ -108,9 +109,18 @@ DbResponse ServiceEntryPointMongos::handleRequest(OperationContext* opCtx,
<< " op: " << networkOpToString(op);
switch (op) {
+ case dbMsg:
+ dbResponse = Strategy::clientOpMsgCommand(opCtx, message);
+ break;
case dbQuery:
if (nss.isCommand() || nss.isSpecialCommand()) {
- dbResponse = Strategy::clientCommandOp(opCtx, nss, &dbm);
+ try {
+ dbResponse = Strategy::clientOpQueryCommand(opCtx, nss, &dbm);
+ } catch (const DBException& ex) {
+ BSONObjBuilder bob;
+ Command::appendCommandStatus(bob, ex.toStatus());
+ dbResponse = replyToQuery(bob.done());
+ }
} else {
dbResponse = Strategy::queryOp(opCtx, nss, &dbm);
}