diff options
author | Mathias Stearn <mathias@10gen.com> | 2017-07-24 14:53:55 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2017-08-17 22:13:09 -0400 |
commit | 5f85627971603bf9f5c832f9d4ca2808b31b0efd (patch) | |
tree | 57cb296a3e55f20722e19bc0d80e1ad2b1f8fb9a /src/mongo/client/dbclient.cpp | |
parent | 5aa60782cebed045a316b5cd6706be726178ce09 (diff) | |
download | mongo-5f85627971603bf9f5c832f9d4ca2808b31b0efd.tar.gz |
SERVER-28964 Close connection after seeing invalid OP_MSG flags
Diffstat (limited to 'src/mongo/client/dbclient.cpp')
-rw-r--r-- | src/mongo/client/dbclient.cpp | 74 |
1 files changed, 49 insertions, 25 deletions
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index 6f57372e40e..82b4ee65a82 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -170,6 +170,24 @@ const rpc::ReplyMetadataReader& DBClientBase::getReplyMetadataReader() { return _metadataReader; } +rpc::UniqueReply DBClientBase::parseCommandReplyMessage(const std::string& host, + const Message& replyMsg) { + auto commandReply = rpc::makeReply(&replyMsg); + + if (_metadataReader) { + auto opCtx = haveClient() ? cc().getOperationContext() : nullptr; + uassertStatusOK(_metadataReader(opCtx, commandReply->getMetadata(), host)); + } + + if (ErrorCodes::SendStaleConfig == + getStatusFromCommandResult(commandReply->getCommandReply())) { + throw RecvStaleConfigException("stale config in runCommand", + commandReply->getCommandReply()); + } + + return rpc::UniqueReply(replyMsg, std::move(commandReply)); +} + std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget( OpMsgRequest request) { // Make sure to reconnect if needed before building our request, since the request depends on @@ -204,7 +222,7 @@ std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget( << "' ", call(requestMsg, replyMsg, false, &host)); - auto commandReply = rpc::makeReply(&replyMsg); + auto commandReply = parseCommandReplyMessage(host, replyMsg); uassert(ErrorCodes::RPCProtocolNegotiationFailed, str::stream() << "Mismatched RPC protocols - request was '" @@ -215,17 +233,7 @@ std::pair<rpc::UniqueReply, DBClientBase*> DBClientBase::runCommandWithTarget( << "' ", rpc::protocolForMessage(requestMsg) == commandReply->getProtocol()); - if (_metadataReader) { - uassertStatusOK(_metadataReader(opCtx, commandReply->getMetadata(), host)); - } - - if (ErrorCodes::SendStaleConfig == - getStatusFromCommandResult(commandReply->getCommandReply())) { - throw RecvStaleConfigException("stale config in runCommand", - commandReply->getCommandReply()); - } - - return {rpc::UniqueReply(std::move(replyMsg), std::move(commandReply)), this}; + return {std::move(commandReply), this}; } std::tuple<bool, DBClientBase*> DBClientBase::runCommandWithTarget(const string& dbname, @@ -939,6 +947,19 @@ std::pair<rpc::UniqueReply, DBClientBase*> DBClientConnection::runCommandWithTar return out; } +rpc::UniqueReply DBClientConnection::parseCommandReplyMessage(const std::string& host, + const Message& replyMsg) { + try { + return DBClientBase::parseCommandReplyMessage(host, std::move(replyMsg)); + } catch (const DBException& ex) { + if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) { + _port->shutdown(); + _failed = true; + } + throw; + } +} + void DBClientConnection::_checkConnection() { if (!_failed) return; @@ -1299,17 +1320,21 @@ bool DBClientConnection::recv(Message& m, int lastRequestId) { return false; } - uassert(40570, - "Response ID did not match the sent message ID.", - m.header().getResponseToMsgId() == lastRequestId); + try { + uassert(40570, + "Response ID did not match the sent message ID.", + m.header().getResponseToMsgId() == lastRequestId); + + if (m.operation() == dbCompressed) { + m = uassertStatusOK(_compressorManager.decompressMessage(m)); + } - if (m.operation() == dbCompressed) { - auto swm = _compressorManager.decompressMessage(m); - uassertStatusOK(swm.getStatus()); - m = std::move(swm.getValue()); + return true; + } catch (const DBException&) { + _failed = true; + _port->shutdown(); + throw; } - - return true; } bool DBClientConnection::call(Message& toSend, @@ -1337,12 +1362,11 @@ bool DBClientConnection::call(Message& toSend, } if (response.operation() == dbCompressed) { - auto swm = _compressorManager.decompressMessage(response); - uassertStatusOK(swm.getStatus()); - response = std::move(swm.getValue()); + response = uassertStatusOK(_compressorManager.decompressMessage(response)); } - } catch (SocketException&) { + } catch (const DBException& ex) { _failed = true; + _port->shutdown(); throw; } return true; |