summaryrefslogtreecommitdiff
path: root/src/mongo/client/dbclient.cpp
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2017-07-24 14:53:55 -0400
committerMathias Stearn <mathias@10gen.com>2017-08-17 22:13:09 -0400
commit5f85627971603bf9f5c832f9d4ca2808b31b0efd (patch)
tree57cb296a3e55f20722e19bc0d80e1ad2b1f8fb9a /src/mongo/client/dbclient.cpp
parent5aa60782cebed045a316b5cd6706be726178ce09 (diff)
downloadmongo-5f85627971603bf9f5c832f9d4ca2808b31b0efd.tar.gz
SERVER-28964 Close connection after seeing invalid OP_MSG flags
Diffstat (limited to 'src/mongo/client/dbclient.cpp')
-rw-r--r--src/mongo/client/dbclient.cpp74
1 files changed, 49 insertions, 25 deletions
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp
index 6f57372e40e..82b4ee65a82 100644
--- a/src/mongo/client/dbclient.cpp
+++ b/src/mongo/client/dbclient.cpp
@@ -170,6 +170,24 @@ const rpc::ReplyMetadataReader& DBClientBase::getReplyMetadataReader() {
return _metadataReader;
}
+rpc::UniqueReply DBClientBase::parseCommandReplyMessage(const std::string& host,
+ const Message& replyMsg) {
+ auto commandReply = rpc::makeReply(&replyMsg);
+
+ if (_metadataReader) {
+ auto opCtx = haveClient() ? cc().getOperationContext() : nullptr;
+ uassertStatusOK(_metadataReader(opCtx, commandReply->getMetadata(), host));
+ }
+
+ if (ErrorCodes::SendStaleConfig ==
+ getStatusFromCommandResult(commandReply->getCommandReply())) {
+ throw RecvStaleConfigException("stale config in runCommand",
+ commandReply->getCommandReply());
+ }
+
+ return rpc::UniqueReply(replyMsg, std::move(commandReply));
+}
+
std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget(
OpMsgRequest request) {
// Make sure to reconnect if needed before building our request, since the request depends on
@@ -204,7 +222,7 @@ std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget(
<< "' ",
call(requestMsg, replyMsg, false, &host));
- auto commandReply = rpc::makeReply(&replyMsg);
+ auto commandReply = parseCommandReplyMessage(host, replyMsg);
uassert(ErrorCodes::RPCProtocolNegotiationFailed,
str::stream() << "Mismatched RPC protocols - request was '"
@@ -215,17 +233,7 @@ std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget(
<< "' ",
rpc::protocolForMessage(requestMsg) == commandReply->getProtocol());
- if (_metadataReader) {
- uassertStatusOK(_metadataReader(opCtx, commandReply->getMetadata(), host));
- }
-
- if (ErrorCodes::SendStaleConfig ==
- getStatusFromCommandResult(commandReply->getCommandReply())) {
- throw RecvStaleConfigException("stale config in runCommand",
- commandReply->getCommandReply());
- }
-
- return {rpc::UniqueReply(std::move(replyMsg), std::move(commandReply)), this};
+ return {std::move(commandReply), this};
}
std::tuple<bool, DBClientBase*> DBClientBase::runCommandWithTarget(const string& dbname,
@@ -939,6 +947,19 @@ std::pair<rpc::UniqueReply, DBClientBase*> DBClientConnection::runCommandWithTar
return out;
}
+rpc::UniqueReply DBClientConnection::parseCommandReplyMessage(const std::string& host,
+ const Message& replyMsg) {
+ try {
+ return DBClientBase::parseCommandReplyMessage(host, std::move(replyMsg));
+ } catch (const DBException& ex) {
+ if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) {
+ _port->shutdown();
+ _failed = true;
+ }
+ throw;
+ }
+}
+
void DBClientConnection::_checkConnection() {
if (!_failed)
return;
@@ -1299,17 +1320,21 @@ bool DBClientConnection::recv(Message& m, int lastRequestId) {
return false;
}
- uassert(40570,
- "Response ID did not match the sent message ID.",
- m.header().getResponseToMsgId() == lastRequestId);
+ try {
+ uassert(40570,
+ "Response ID did not match the sent message ID.",
+ m.header().getResponseToMsgId() == lastRequestId);
+
+ if (m.operation() == dbCompressed) {
+ m = uassertStatusOK(_compressorManager.decompressMessage(m));
+ }
- if (m.operation() == dbCompressed) {
- auto swm = _compressorManager.decompressMessage(m);
- uassertStatusOK(swm.getStatus());
- m = std::move(swm.getValue());
+ return true;
+ } catch (const DBException&) {
+ _failed = true;
+ _port->shutdown();
+ throw;
}
-
- return true;
}
bool DBClientConnection::call(Message& toSend,
@@ -1337,12 +1362,11 @@ bool DBClientConnection::call(Message& toSend,
}
if (response.operation() == dbCompressed) {
- auto swm = _compressorManager.decompressMessage(response);
- uassertStatusOK(swm.getStatus());
- response = std::move(swm.getValue());
+ response = uassertStatusOK(_compressorManager.decompressMessage(response));
}
- } catch (SocketException&) {
+ } catch (const DBException& ex) {
_failed = true;
+ _port->shutdown();
throw;
}
return true;