diff options
author | Adam Midvidy <amidvidy@gmail.com> | 2015-07-07 15:39:58 -0400 |
---|---|---|
committer | Adam Midvidy <amidvidy@gmail.com> | 2015-07-09 13:17:01 -0400 |
commit | 331678a191362dfb79a0243fee7b5f9f619ae338 (patch) | |
tree | 4815611744d5728c8c190c2c252caa823753b83d /src/mongo | |
parent | 3edf8b672c46e56f4d2e95763a6c0ec6fbbd9018 (diff) | |
download | mongo-331678a191362dfb79a0243fee7b5f9f619ae338.tar.gz |
SERVER-19156 support OP_COMMAND in NetworkInterfaceASIO
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/executor/network_interface_asio.h | 45 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio_command.cpp | 87 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio_connect.cpp | 25 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio_operation.cpp | 20 | ||||
-rw-r--r-- | src/mongo/rpc/factory.cpp | 6 | ||||
-rw-r--r-- | src/mongo/rpc/factory.h | 2 | ||||
-rw-r--r-- | src/mongo/util/net/message.h | 2 |
7 files changed, 143 insertions, 44 deletions
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index dfdf92d0551..add90aa2c0a 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -30,6 +30,7 @@ #include <asio.hpp> #include <boost/optional.hpp> +#include <memory> #include <string> #include <system_error> #include <unordered_map> @@ -81,10 +82,17 @@ private: */ class AsyncConnection { public: - AsyncConnection(asio::ip::tcp::socket&& sock, rpc::ProtocolSet protocols); + AsyncConnection(asio::ip::tcp::socket&& sock, rpc::ProtocolSet serverProtocols); + + AsyncConnection(asio::ip::tcp::socket&& sock, + rpc::ProtocolSet serverProtocols, + boost::optional<ConnectionPool::ConnectionPtr>&& bootstrapConn); asio::ip::tcp::socket& sock(); + rpc::ProtocolSet serverProtocols() const; + rpc::ProtocolSet clientProtocols() const; + // Explicit move construction and assignment to support MSVC #if defined(_MSC_VER) && _MSC_VER < 1900 AsyncConnection(AsyncConnection&&); @@ -96,7 +104,16 @@ private: private: asio::ip::tcp::socket _sock; - rpc::ProtocolSet _protocols; + + rpc::ProtocolSet _serverProtocols; + rpc::ProtocolSet _clientProtocols{rpc::supports::kAll}; + + /** + * The bootstrap connection we use to run auth. This will eventually go away when we finish + * implementing async auth, but for now we need to keep it alive so that the socket it + * creates stays open. + */ + boost::optional<ConnectionPool::ConnectionPtr> _bootstrapConn; }; /** @@ -132,8 +149,15 @@ private: Date_t start() const; Message* toSend(); + + void setToSend(Message&& message); + Message* toRecv(); + rpc::Protocol operationProtocol() const; + + void setOperationProtocol(rpc::Protocol proto); + private: enum class OpState { kReady, @@ -154,12 +178,22 @@ private: */ boost::optional<AsyncConnection> _connection; + /** + * The RPC protocol used for this operation. We wrap it in an optional as it + * is not known until we obtain a connection. + */ + boost::optional<rpc::Protocol> _operationProtocol; + const Date_t _start; OpState _state; AtomicUInt64 _canceled; - Message _toSend; + /** + * The outgoing command associated with this operation. + */ + boost::optional<Message> _toSend; + Message _toRecv; MSGHEADER::Value _header; @@ -168,9 +202,8 @@ private: void _asyncRunCommand(AsyncOp* op); - void _messageFromRequest(const RemoteCommandRequest& request, - Message* toSend, - bool useOpCommand = false); + std::unique_ptr<Message> _messageFromRequest(const RemoteCommandRequest& request, + rpc::Protocol protocol); void _asyncSendSimpleMessage(AsyncOp* op, const asio::const_buffer& buf); diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index c7e9d15caf6..94d1fdbd5ca 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -36,7 +36,11 @@ #include "mongo/db/dbmessage.h" #include "mongo/db/jsobj.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/request_builder_interface.h" #include "mongo/util/log.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" namespace mongo { namespace executor { @@ -56,28 +60,23 @@ void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op) { _connectWithDBClientConnection(op); } -void NetworkInterfaceASIO::_messageFromRequest(const RemoteCommandRequest& request, - Message* toSend, - bool useOpCommand) { +std::unique_ptr<Message> NetworkInterfaceASIO::_messageFromRequest( + const RemoteCommandRequest& request, rpc::Protocol protocol) { BSONObj query = request.cmdObj; - invariant(query.isValid()); + auto requestBuilder = rpc::makeRequestBuilder(protocol); - // TODO: Once OP_COMMAND work is complete, - // look at client to see if it supports OP_COMMAND. + // TODO: handle metadata writers + auto toSend = rpc::makeRequestBuilder(protocol) + ->setDatabase(request.dbname) + .setCommandName(request.cmdObj.firstElementFieldName()) + .setMetadata(request.metadata) + .setCommandArgs(request.cmdObj) + .done(); - // TODO: Investigate whether we can use CommandRequestBuilder here. - - BufBuilder b; - b.appendNum(0); // opts - b.appendStr(request.dbname + ".$cmd"); - b.appendNum(0); // toSkip - b.appendNum(1); // toReturn, don't care about responses - query.appendSelfToBufBuilder(b); - - // TODO: If AsyncOp can own this buffer, we can avoid copying it in setData(). - toSend->setData(dbQuery, b.buf(), b.len()); toSend->header().setId(nextMessageId()); toSend->header().setResponseTo(0); + + return toSend; } void NetworkInterfaceASIO::_asyncSendSimpleMessage(AsyncOp* op, const asio::const_buffer& buf) { @@ -102,32 +101,60 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { return _completeOperation(op, kCanceledStatus); } - Message* toSend = op->toSend(); - _messageFromRequest(op->request(), toSend); + auto negotiatedProtocol = + rpc::negotiate(op->connection()->serverProtocols(), op->connection()->clientProtocols()); + + if (!negotiatedProtocol.isOK()) { + return _completeOperation(op, negotiatedProtocol.getStatus()); + } + + op->setOperationProtocol(negotiatedProtocol.getValue()); + + op->setToSend(std::move(*_messageFromRequest(op->request(), negotiatedProtocol.getValue()))); - if (toSend->empty()) + // TODO: Is this logic actually necessary (SERVER-19320)? + if (op->toSend()->empty()) return _completedWriteCallback(op); // TODO: Some day we may need to support vector messages. - fassert(28708, toSend->buf() != 0); - asio::const_buffer buf(toSend->buf(), toSend->size()); + fassert(28708, op->toSend()->buf() != 0); + asio::const_buffer buf(op->toSend()->buf(), op->toSend()->size()); return _asyncSendSimpleMessage(op, buf); } void NetworkInterfaceASIO::_completedWriteCallback(AsyncOp* op) { // If we were told to send an empty message, toRecv will be empty here. - // TODO: handle metadata SERVER-19156 - BSONObj commandReply; + // TODO: handle metadata readers + const auto elapsed = [this, op]() { return now() - op->start(); }; + if (op->toRecv()->empty()) { LOG(3) << "received an empty message"; - } else { - QueryResult::View qr = op->toRecv()->singleData().view2ptr(); - // unavoidable copy - commandReply = BSONObj(qr.data()).getOwned(); + return _completeOperation(op, RemoteCommandResponse(BSONObj(), BSONObj(), elapsed())); + } + + try { + auto reply = rpc::makeReply(op->toRecv()); + + if (reply->getProtocol() != op->operationProtocol()) { + return _completeOperation(op, + Status(ErrorCodes::RPCProtocolNegotiationFailed, + str::stream() + << "Mismatched RPC protocols - request was '" + << opToString(op->toSend()->operation()) << "' '" + << " but reply was '" + << opToString(op->toRecv()->operation()) << "'")); + } + + _completeOperation(op, + // unavoidable copy + RemoteCommandResponse(reply->getCommandReply().getOwned(), + reply->getMetadata().getOwned(), + elapsed())); + } catch (...) { + // makeReply can throw if the reply was invalid. + _completeOperation(op, exceptionToStatus()); } - _completeOperation( - op, RemoteCommandResponse(std::move(commandReply), BSONObj(), now() - op->start())); } void NetworkInterfaceASIO::_networkErrorCallback(AsyncOp* op, const std::error_code& ec) { diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp index bf2dcb76457..1eded832ed6 100644 --- a/src/mongo/executor/network_interface_asio_connect.cpp +++ b/src/mongo/executor/network_interface_asio_connect.cpp @@ -48,16 +48,27 @@ const auto kCanceledStatus = Status(ErrorCodes::CallbackCanceled, "Callback canc NetworkInterfaceASIO::AsyncConnection::AsyncConnection(asio::ip::tcp::socket&& sock, rpc::ProtocolSet protocols) - : _sock(std::move(sock)), _protocols(protocols) {} + : AsyncConnection(std::move(sock), protocols, boost::none) {} + +NetworkInterfaceASIO::AsyncConnection::AsyncConnection( + asio::ip::tcp::socket&& sock, + rpc::ProtocolSet protocols, + boost::optional<ConnectionPool::ConnectionPtr>&& bootstrapConn) + : _sock(std::move(sock)), + _serverProtocols(protocols), + _bootstrapConn(std::move(bootstrapConn)) {} #if defined(_MSC_VER) && _MSC_VER < 1900 NetworkInterfaceASIO::AsyncConnection::AsyncConnection(AsyncConnection&& other) - : _sock(std::move(other._sock)), _protocols(other._protocols) {} + : _sock(std::move(other._sock)), + _serverProtocols(other._serverProtocols), + _clientProtocols(other._clientProtocols) {} NetworkInterfaceASIO::AsyncConnection& NetworkInterfaceASIO::AsyncConnection::operator=( AsyncConnection&& other) { _sock = std::move(other._sock); - _protocols = other._protocols; + _serverProtocols = other._serverProtocols; + _clientProtocols = other._clientProtocols; return *this; } #endif @@ -66,6 +77,14 @@ asio::ip::tcp::socket& NetworkInterfaceASIO::AsyncConnection::sock() { return _sock; } +rpc::ProtocolSet NetworkInterfaceASIO::AsyncConnection::serverProtocols() const { + return _serverProtocols; +} + +rpc::ProtocolSet NetworkInterfaceASIO::AsyncConnection::clientProtocols() const { + return _clientProtocols; +} + void NetworkInterfaceASIO::_connectASIO(AsyncOp* op) { tcp::resolver::query query(op->request().target.host(), std::to_string(op->request().target.port())); diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index cfebb680141..1f96c4c7926 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -106,7 +106,7 @@ void NetworkInterfaceASIO::AsyncOp::connect(ConnectionPool* const pool, tcp::socket sock{ *service, protocol == AF_INET ? tcp::v4() : tcp::v6(), conn.get()->port().psock->rawFD()}; - _connection.emplace(std::move(sock), conn.get()->getServerRPCProtocols()); + _connection.emplace(std::move(sock), conn.get()->getServerRPCProtocols(), std::move(conn)); _state = OpState::kConnected; } @@ -147,12 +147,28 @@ Date_t NetworkInterfaceASIO::AsyncOp::start() const { } Message* NetworkInterfaceASIO::AsyncOp::toSend() { - return &_toSend; + invariant(_toSend.is_initialized()); + return _toSend.get_ptr(); +} + +void NetworkInterfaceASIO::AsyncOp::setToSend(Message&& message) { + invariant(!_toSend.is_initialized()); + _toSend = std::move(message); } Message* NetworkInterfaceASIO::AsyncOp::toRecv() { return &_toRecv; } +rpc::Protocol NetworkInterfaceASIO::AsyncOp::operationProtocol() const { + invariant(_operationProtocol.is_initialized()); + return *_operationProtocol; +} + +void NetworkInterfaceASIO::AsyncOp::setOperationProtocol(rpc::Protocol proto) { + invariant(!_operationProtocol.is_initialized()); + _operationProtocol = proto; +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/rpc/factory.cpp b/src/mongo/rpc/factory.cpp index ae16d16c62e..29a0a03c6fa 100644 --- a/src/mongo/rpc/factory.cpp +++ b/src/mongo/rpc/factory.cpp @@ -49,7 +49,11 @@ namespace rpc { std::unique_ptr<RequestBuilderInterface> makeRequestBuilder(ProtocolSet clientProtos, ProtocolSet serverProtos) { - switch (uassertStatusOK(negotiate(clientProtos, serverProtos))) { + return makeRequestBuilder(uassertStatusOK(negotiate(clientProtos, serverProtos))); +} + +std::unique_ptr<RequestBuilderInterface> makeRequestBuilder(Protocol proto) { + switch (proto) { case Protocol::kOpQuery: return stdx::make_unique<LegacyRequestBuilder>(); case Protocol::kOpCommandV1: diff --git a/src/mongo/rpc/factory.h b/src/mongo/rpc/factory.h index 2776ffacc47..f2d4e2fd3cb 100644 --- a/src/mongo/rpc/factory.h +++ b/src/mongo/rpc/factory.h @@ -52,6 +52,8 @@ class RequestInterface; std::unique_ptr<RequestBuilderInterface> makeRequestBuilder(ProtocolSet clientProtos, ProtocolSet serverProtos); +std::unique_ptr<RequestBuilderInterface> makeRequestBuilder(Protocol proto); + /** * Returns the appropriate concrete Reply according to the contents of the message. * Throws if one cannot be chosen. diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h index 9f96026f070..8444fc04c4d 100644 --- a/src/mongo/util/net/message.h +++ b/src/mongo/util/net/message.h @@ -412,8 +412,6 @@ public: _setData(buf, true); } - friend void swap(Message& other); - // vector swap() so this is fast Message& operator=(Message&& r) { verify(empty()); |