summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/dbclient.cpp38
-rw-r--r--src/mongo/client/dbclient_rs.cpp7
-rw-r--r--src/mongo/client/dbclient_rs.h1
-rw-r--r--src/mongo/client/dbclientinterface.h7
-rw-r--r--src/mongo/tools/bridge.cpp9
5 files changed, 52 insertions, 10 deletions
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp
index 82b4ee65a82..3c42efc050b 100644
--- a/src/mongo/client/dbclient.cpp
+++ b/src/mongo/client/dbclient.cpp
@@ -188,6 +188,31 @@ rpc::UniqueReply DBClientBase::parseCommandReplyMessage(const std::string& host,
return rpc::UniqueReply(replyMsg, std::move(commandReply));
}
+DBClientBase* DBClientBase::runFireAndForgetCommand(OpMsgRequest request) {
+ // Make sure to reconnect if needed before building our request, since the request depends on
+ // the negotiated protocol which can change due to a reconnect.
+ checkConnection();
+
+ if (uassertStatusOK(rpc::negotiate(getClientRPCProtocols(), getServerRPCProtocols())) !=
+ rpc::Protocol::kOpMsg) {
+ // Other protocols don't support fire-and-forget. Downgrade to two-way command and throw
+ // away reply.
+ return runCommandWithTarget(request).second;
+ }
+
+ if (_metadataWriter) {
+ BSONObjBuilder metadataBob(std::move(request.body));
+ uassertStatusOK(
+ _metadataWriter((haveClient() ? cc().getOperationContext() : nullptr), &metadataBob));
+ request.body = metadataBob.obj();
+ }
+
+ auto requestMsg = request.serialize();
+ OpMsg::setFlag(&requestMsg, OpMsg::kMoreToCome);
+ say(requestMsg);
+ return this;
+}
+
std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget(
OpMsgRequest request) {
// Make sure to reconnect if needed before building our request, since the request depends on
@@ -1138,8 +1163,7 @@ void DBClientBase::insert(const string& ns, const vector<BSONObj>& v, int flags)
OpMsgRequest::fromDBAndBody(nss.db(), BSON("insert" << nss.coll() << "ordered" << ordered));
request.sequences.push_back({"documents", v});
- // Ignoring reply to match fire-and-forget OP_INSERT behavior.
- runCommand(std::move(request));
+ runFireAndForgetCommand(std::move(request));
}
void DBClientBase::remove(const string& ns, Query obj, int flags) {
@@ -1149,8 +1173,7 @@ void DBClientBase::remove(const string& ns, Query obj, int flags) {
auto request = OpMsgRequest::fromDBAndBody(nss.db(), BSON("delete" << nss.coll()));
request.sequences.push_back({"deletes", {BSON("q" << obj.obj << "limit" << limit)}});
- // Ignoring reply to match fire-and-forget OP_REMOVE behavior.
- runCommand(std::move(request));
+ runFireAndForgetCommand(std::move(request));
}
void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upsert, bool multi) {
@@ -1161,8 +1184,7 @@ void DBClientBase::update(const string& ns, Query query, BSONObj obj, bool upser
{"updates",
{BSON("q" << query.obj << "u" << obj << "upsert" << upsert << "multi" << multi)}});
- // Ignoring reply to match fire-and-forget OP_UPDATE behavior.
- runCommand(std::move(request));
+ runFireAndForgetCommand(std::move(request));
}
void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags) {
@@ -1174,8 +1196,8 @@ void DBClientBase::update(const string& ns, Query query, BSONObj obj, int flags)
}
void DBClientBase::killCursor(const NamespaceString& ns, long long cursorId) {
- // Ignoring reply to match fire-and-forget OP_KILLCURSORS behavior.
- runCommand(OpMsgRequest::fromDBAndBody(ns.db(), KillCursorsRequest(ns, {cursorId}).toBSON()));
+ runFireAndForgetCommand(
+ OpMsgRequest::fromDBAndBody(ns.db(), KillCursorsRequest(ns, {cursorId}).toBSON()));
}
list<BSONObj> DBClientBase::getIndexSpecs(const string& ns, int options) {
diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp
index 13d706b7cd6..667da194490 100644
--- a/src/mongo/client/dbclient_rs.cpp
+++ b/src/mongo/client/dbclient_rs.cpp
@@ -901,6 +901,13 @@ void DBClientReplicaSet::checkResponse(const std::vector<BSONObj>& batch,
}
}
+DBClientBase* DBClientReplicaSet::runFireAndForgetCommand(OpMsgRequest request) {
+ // Assume all fire-and-forget commands should go to the primary node. It is currently used
+ // for writes which need to go to the primary and for killCursors which should be sent to a
+ // specific host rather than through DBClientReplicaSet.
+ return checkMaster()->runFireAndForgetCommand(std::move(request));
+}
+
std::pair<rpc::UniqueReply, DBClientBase*> DBClientReplicaSet::runCommandWithTarget(
OpMsgRequest request) {
// This overload exists so we can parse out the read preference and then use server
diff --git a/src/mongo/client/dbclient_rs.h b/src/mongo/client/dbclient_rs.h
index 25844b264bc..9d554feed1b 100644
--- a/src/mongo/client/dbclient_rs.h
+++ b/src/mongo/client/dbclient_rs.h
@@ -188,6 +188,7 @@ public:
using DBClientBase::runCommandWithTarget;
std::pair<rpc::UniqueReply, DBClientBase*> runCommandWithTarget(OpMsgRequest request) final;
+ DBClientBase* runFireAndForgetCommand(OpMsgRequest request) final;
void setRequestMetadataWriter(rpc::RequestMetadataWriter writer) final;
diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h
index a1a653fc9ef..06494f61e8c 100644
--- a/src/mongo/client/dbclientinterface.h
+++ b/src/mongo/client/dbclientinterface.h
@@ -294,6 +294,13 @@ public:
return runCommandWithTarget(std::move(request)).first;
}
+ /**
+ * Runs the specified command request in fire-and-forget mode and returns the connection that
+ * the command was actually sent on. If the connection doesn't support OP_MSG, the request will
+ * be run as a normal two-way command and the reply will be ignored after parsing.
+ */
+ virtual DBClientBase* runFireAndForgetCommand(OpMsgRequest request);
+
/** Run a database command. Database commands are represented as BSON objects. Common database
commands have prebuilt helper functions -- see below. If a helper is not available you can
directly call runCommand.
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index 07942a6a61b..5f9eac1e4e0 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -150,6 +150,8 @@ public:
request = std::move(swm.getValue());
}
+ const bool isFireAndForgetCommand = OpMsg::isFlagSet(request, OpMsg::kMoreToCome);
+
boost::optional<OpMsgRequest> cmdRequest;
if ((request.operation() == dbQuery &&
NamespaceString(DbMessage(request).getns()).isCommand()) ||
@@ -170,6 +172,8 @@ public:
// The 'request' is consumed by the mongobridge and does not get forwarded to
// 'dest'.
if (auto status = maybeProcessBridgeCommand(cmdRequest)) {
+ invariant(!isFireAndForgetCommand);
+
auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(request));
BSONObj metadata;
BSONObj reply;
@@ -222,8 +226,9 @@ public:
// Send the message we received from '_mp' to 'dest'. 'dest' returns a response for
// OP_QUERY, OP_GET_MORE, and OP_COMMAND messages that we respond back to
// '_mp' with.
- if (request.operation() == dbQuery || request.operation() == dbGetMore ||
- request.operation() == dbCommand || request.operation() == dbMsg) {
+ if (!isFireAndForgetCommand &&
+ (request.operation() == dbQuery || request.operation() == dbGetMore ||
+ request.operation() == dbCommand || request.operation() == dbMsg)) {
// TODO dbMsg moreToCome
// Forward the message to 'dest' and receive its reply in 'response'.
response.reset();