diff options
author | Mathias Stearn <mathias@10gen.com> | 2017-07-24 15:36:03 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2017-08-17 22:13:09 -0400 |
commit | 112982eeaddf92cbc14be655061200e23069250a (patch) | |
tree | c5ccae14cc77916823f87eea07f6b39e7d6718e5 /src/mongo/client | |
parent | 5f85627971603bf9f5c832f9d4ca2808b31b0efd (diff) | |
download | mongo-112982eeaddf92cbc14be655061200e23069250a.tar.gz |
SERVER-28510 Add DBClient::runFireAndForgetCommand() and use it to implement legacy op emulation
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/dbclient.cpp | 38 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.cpp | 7 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.h | 1 | ||||
-rw-r--r-- | src/mongo/client/dbclientinterface.h | 7 |
4 files changed, 45 insertions, 8 deletions
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index 82b4ee65a82..3c42efc050b 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -188,6 +188,31 @@ rpc::UniqueReply DBClientBase::parseCommandReplyMessage(const std::string& host, return rpc::UniqueReply(replyMsg, std::move(commandReply)); } +DBClientBase* DBClientBase::runFireAndForgetCommand(OpMsgRequest request) { + // Make sure to reconnect if needed before building our request, since the request depends on + // the negotiated protocol which can change due to a reconnect. + checkConnection(); + + if (uassertStatusOK(rpc::negotiate(getClientRPCProtocols(), getServerRPCProtocols())) != + rpc::Protocol::kOpMsg) { + // Other protocols don't support fire-and-forget. Downgrade to two-way command and throw + // away reply. + return runCommandWithTarget(request).second; + } + + if (_metadataWriter) { + BSONObjBuilder metadataBob(std::move(request.body)); + uassertStatusOK( + _metadataWriter((haveClient() ? cc().getOperationContext() : nullptr), &metadataBob)); + request.body = metadataBob.obj(); + } + + auto requestMsg = request.serialize(); + OpMsg::setFlag(&requestMsg, OpMsg::kMoreToCome); + say(requestMsg); + return this; +} + std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget( OpMsgRequest request) { // Make sure to reconnect if needed before building our request, since the request depends on @@ -1138,8 +1163,7 @@ void DBClientBase::insert(const string& ns, const vector<BSONObj>& v, int flags) OpMsgRequest::fromDBAndBody(nss.db(), BSON("insert" << nss.coll() << "ordered" << ordered)); request.sequences.push_back({"documents", v}); - // Ignoring reply to match fire-and-forget OP_INSERT behavior. - runCommand(std::move(request)); + runFireAndForgetCommand(std::move(request)); } void DBClientBase::remove(const string& ns, Query obj, int flags) { @@ -1149,8 +1173,7 @@ void DBClientBase::remove(const string& ns, Query obj, int flags) { auto request = OpMsgRequest::fromDBAndBody(nss.db(), BSON("delete" << nss.coll())); request.sequences.push_back({"deletes", {BSON("q" << obj.obj << "limit" << limit)}}); - // Ignoring reply to match fire-and-forget OP_REMOVE behavior. - runCommand(std::move(request)); + runFireAndForgetCommand(std::move(request)); } void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upsert, bool multi) { @@ -1161,8 +1184,7 @@ void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upser {"updates", {BSON("q" << query.obj << "u" << obj << "upsert" << upsert << "multi" << multi)}}); - // Ignoring reply to match fire-and-forget OP_UPDATE behavior. - runCommand(std::move(request)); + runFireAndForgetCommand(std::move(request)); } void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags) { @@ -1174,8 +1196,8 @@ void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags) } void DBClientBase::killCursor(const NamespaceString& ns, long long cursorId) { - // Ignoring reply to match fire-and-forget OP_KILLCURSORS behavior. - runCommand(OpMsgRequest::fromDBAndBody(ns.db(), KillCursorsRequest(ns, {cursorId}).toBSON())); + runFireAndForgetCommand( + OpMsgRequest::fromDBAndBody(ns.db(), KillCursorsRequest(ns, {cursorId}).toBSON())); } list<BSONObj> DBClientBase::getIndexSpecs(const string& ns, int options) { diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index 13d706b7cd6..667da194490 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -901,6 +901,13 @@ void DBClientReplicaSet::checkResponse(const std::vector<BSONObj>& batch, } } +DBClientBase* DBClientReplicaSet::runFireAndForgetCommand(OpMsgRequest request) { + // Assume all fire-and-forget commands should go to the primary node. It is currently used + // for writes which need to go to the primary and for killCursors which should be sent to a + // specific host rather than through DBClientReplicaSet. + return checkMaster()->runFireAndForgetCommand(std::move(request)); +} + std::pair<rpc::UniqueReply, DBClientBase*> DBClientReplicaSet::runCommandWithTarget( OpMsgRequest request) { // This overload exists so we can parse out the read preference and then use server diff --git a/src/mongo/client/dbclient_rs.h b/src/mongo/client/dbclient_rs.h index 25844b264bc..9d554feed1b 100644 --- a/src/mongo/client/dbclient_rs.h +++ b/src/mongo/client/dbclient_rs.h @@ -188,6 +188,7 @@ public: using DBClientBase::runCommandWithTarget; std::pair<rpc::UniqueReply, DBClientBase*> runCommandWithTarget(OpMsgRequest request) final; + DBClientBase* runFireAndForgetCommand(OpMsgRequest request) final; void setRequestMetadataWriter(rpc::RequestMetadataWriter writer) final; diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h index a1a653fc9ef..06494f61e8c 100644 --- a/src/mongo/client/dbclientinterface.h +++ b/src/mongo/client/dbclientinterface.h @@ -294,6 +294,13 @@ public: return runCommandWithTarget(std::move(request)).first; } + /** + * Runs the specified command request in fire-and-forget mode and returns the connection that + * the command was actually sent on. If the connection doesn't support OP_MSG, the request will + * be run as a normal two-way command and the reply will be ignored after parsing. + */ + virtual DBClientBase* runFireAndForgetCommand(OpMsgRequest request); + /** Run a database command. Database commands are represented as BSON objects. Common database commands have prebuilt helper functions -- see below. If a helper is not available you can directly call runCommand. |