diff options
author | Mathias Stearn <mathias@10gen.com> | 2017-07-24 15:36:03 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2017-08-17 22:13:09 -0400 |
commit | 112982eeaddf92cbc14be655061200e23069250a (patch) | |
tree | c5ccae14cc77916823f87eea07f6b39e7d6718e5 | |
parent | 5f85627971603bf9f5c832f9d4ca2808b31b0efd (diff) | |
download | mongo-112982eeaddf92cbc14be655061200e23069250a.tar.gz |
SERVER-28510 Add DBClient::runFireAndForgetCommand() and use it to implement legacy op emulation
-rw-r--r-- | src/mongo/client/dbclient.cpp | 38 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.cpp | 7 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.h | 1 | ||||
-rw-r--r-- | src/mongo/client/dbclientinterface.h | 7 | ||||
-rw-r--r-- | src/mongo/tools/bridge.cpp | 9 |
5 files changed, 52 insertions, 10 deletions
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index 82b4ee65a82..3c42efc050b 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -188,6 +188,31 @@ rpc::UniqueReply DBClientBase::parseCommandReplyMessage(const std::string& host, return rpc::UniqueReply(replyMsg, std::move(commandReply)); } +DBClientBase* DBClientBase::runFireAndForgetCommand(OpMsgRequest request) { + // Make sure to reconnect if needed before building our request, since the request depends on + // the negotiated protocol which can change due to a reconnect. + checkConnection(); + + if (uassertStatusOK(rpc::negotiate(getClientRPCProtocols(), getServerRPCProtocols())) != + rpc::Protocol::kOpMsg) { + // Other protocols don't support fire-and-forget. Downgrade to two-way command and throw + // away reply. + return runCommandWithTarget(request).second; + } + + if (_metadataWriter) { + BSONObjBuilder metadataBob(std::move(request.body)); + uassertStatusOK( + _metadataWriter((haveClient() ? cc().getOperationContext() : nullptr), &metadataBob)); + request.body = metadataBob.obj(); + } + + auto requestMsg = request.serialize(); + OpMsg::setFlag(&requestMsg, OpMsg::kMoreToCome); + say(requestMsg); + return this; +} + std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget( OpMsgRequest request) { // Make sure to reconnect if needed before building our request, since the request depends on @@ -1138,8 +1163,7 @@ void DBClientBase::insert(const string& ns, const vector<BSONObj>& v, int flags) OpMsgRequest::fromDBAndBody(nss.db(), BSON("insert" << nss.coll() << "ordered" << ordered)); request.sequences.push_back({"documents", v}); - // Ignoring reply to match fire-and-forget OP_INSERT behavior. - runCommand(std::move(request)); + runFireAndForgetCommand(std::move(request)); } void DBClientBase::remove(const string& ns, Query obj, int flags) { @@ -1149,8 +1173,7 @@ void DBClientBase::remove(const string& ns, Query obj, int flags) { auto request = OpMsgRequest::fromDBAndBody(nss.db(), BSON("delete" << nss.coll())); request.sequences.push_back({"deletes", {BSON("q" << obj.obj << "limit" << limit)}}); - // Ignoring reply to match fire-and-forget OP_REMOVE behavior. - runCommand(std::move(request)); + runFireAndForgetCommand(std::move(request)); } void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upsert, bool multi) { @@ -1161,8 +1184,7 @@ void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upser {"updates", {BSON("q" << query.obj << "u" << obj << "upsert" << upsert << "multi" << multi)}}); - // Ignoring reply to match fire-and-forget OP_UPDATE behavior. - runCommand(std::move(request)); + runFireAndForgetCommand(std::move(request)); } void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags) { @@ -1174,8 +1196,8 @@ void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags) } void DBClientBase::killCursor(const NamespaceString& ns, long long cursorId) { - // Ignoring reply to match fire-and-forget OP_KILLCURSORS behavior. - runCommand(OpMsgRequest::fromDBAndBody(ns.db(), KillCursorsRequest(ns, {cursorId}).toBSON())); + runFireAndForgetCommand( + OpMsgRequest::fromDBAndBody(ns.db(), KillCursorsRequest(ns, {cursorId}).toBSON())); } list<BSONObj> DBClientBase::getIndexSpecs(const string& ns, int options) { diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index 13d706b7cd6..667da194490 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -901,6 +901,13 @@ void DBClientReplicaSet::checkResponse(const std::vector<BSONObj>& batch, } } +DBClientBase* DBClientReplicaSet::runFireAndForgetCommand(OpMsgRequest request) { + // Assume all fire-and-forget commands should go to the primary node. It is currently used + // for writes which need to go to the primary and for killCursors which should be sent to a + // specific host rather than through DBClientReplicaSet. + return checkMaster()->runFireAndForgetCommand(std::move(request)); +} + std::pair<rpc::UniqueReply, DBClientBase*> DBClientReplicaSet::runCommandWithTarget( OpMsgRequest request) { // This overload exists so we can parse out the read preference and then use server diff --git a/src/mongo/client/dbclient_rs.h b/src/mongo/client/dbclient_rs.h index 25844b264bc..9d554feed1b 100644 --- a/src/mongo/client/dbclient_rs.h +++ b/src/mongo/client/dbclient_rs.h @@ -188,6 +188,7 @@ public: using DBClientBase::runCommandWithTarget; std::pair<rpc::UniqueReply, DBClientBase*> runCommandWithTarget(OpMsgRequest request) final; + DBClientBase* runFireAndForgetCommand(OpMsgRequest request) final; void setRequestMetadataWriter(rpc::RequestMetadataWriter writer) final; diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h index a1a653fc9ef..06494f61e8c 100644 --- a/src/mongo/client/dbclientinterface.h +++ b/src/mongo/client/dbclientinterface.h @@ -294,6 +294,13 @@ public: return runCommandWithTarget(std::move(request)).first; } + /** + * Runs the specified command request in fire-and-forget mode and returns the connection that + * the command was actually sent on. If the connection doesn't support OP_MSG, the request will + * be run as a normal two-way command and the reply will be ignored after parsing. + */ + virtual DBClientBase* runFireAndForgetCommand(OpMsgRequest request); + /** Run a database command. Database commands are represented as BSON objects. Common database commands have prebuilt helper functions -- see below. If a helper is not available you can directly call runCommand. diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index 07942a6a61b..5f9eac1e4e0 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -150,6 +150,8 @@ public: request = std::move(swm.getValue()); } + const bool isFireAndForgetCommand = OpMsg::isFlagSet(request, OpMsg::kMoreToCome); + boost::optional<OpMsgRequest> cmdRequest; if ((request.operation() == dbQuery && NamespaceString(DbMessage(request).getns()).isCommand()) || @@ -170,6 +172,8 @@ public: // The 'request' is consumed by the mongobridge and does not get forwarded to // 'dest'. if (auto status = maybeProcessBridgeCommand(cmdRequest)) { + invariant(!isFireAndForgetCommand); + auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(request)); BSONObj metadata; BSONObj reply; @@ -222,8 +226,9 @@ public: // Send the message we received from '_mp' to 'dest'. 'dest' returns a response for // OP_QUERY, OP_GET_MORE, and OP_COMMAND messages that we respond back to // '_mp' with. - if (request.operation() == dbQuery || request.operation() == dbGetMore || - request.operation() == dbCommand || request.operation() == dbMsg) { + if (!isFireAndForgetCommand && + (request.operation() == dbQuery || request.operation() == dbGetMore || + request.operation() == dbCommand || request.operation() == dbMsg)) { // TODO dbMsg moreToCome // Forward the message to 'dest' and receive its reply in 'response'. response.reset(); |