summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAdam Midvidy <amidvidy@gmail.com>2015-07-07 15:39:58 -0400
committerAdam Midvidy <amidvidy@gmail.com>2015-07-09 13:17:01 -0400
commit331678a191362dfb79a0243fee7b5f9f619ae338 (patch)
tree4815611744d5728c8c190c2c252caa823753b83d /src/mongo
parent3edf8b672c46e56f4d2e95763a6c0ec6fbbd9018 (diff)
downloadmongo-331678a191362dfb79a0243fee7b5f9f619ae338.tar.gz
SERVER-19156 support OP_COMMAND in NetworkInterfaceASIO
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/executor/network_interface_asio.h45
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp87
-rw-r--r--src/mongo/executor/network_interface_asio_connect.cpp25
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp20
-rw-r--r--src/mongo/rpc/factory.cpp6
-rw-r--r--src/mongo/rpc/factory.h2
-rw-r--r--src/mongo/util/net/message.h2
7 files changed, 143 insertions, 44 deletions
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index dfdf92d0551..add90aa2c0a 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -30,6 +30,7 @@
#include <asio.hpp>
#include <boost/optional.hpp>
+#include <memory>
#include <string>
#include <system_error>
#include <unordered_map>
@@ -81,10 +82,17 @@ private:
*/
class AsyncConnection {
public:
- AsyncConnection(asio::ip::tcp::socket&& sock, rpc::ProtocolSet protocols);
+ AsyncConnection(asio::ip::tcp::socket&& sock, rpc::ProtocolSet serverProtocols);
+
+ AsyncConnection(asio::ip::tcp::socket&& sock,
+ rpc::ProtocolSet serverProtocols,
+ boost::optional<ConnectionPool::ConnectionPtr>&& bootstrapConn);
asio::ip::tcp::socket& sock();
+ rpc::ProtocolSet serverProtocols() const;
+ rpc::ProtocolSet clientProtocols() const;
+
// Explicit move construction and assignment to support MSVC
#if defined(_MSC_VER) && _MSC_VER < 1900
AsyncConnection(AsyncConnection&&);
@@ -96,7 +104,16 @@ private:
private:
asio::ip::tcp::socket _sock;
- rpc::ProtocolSet _protocols;
+
+ rpc::ProtocolSet _serverProtocols;
+ rpc::ProtocolSet _clientProtocols{rpc::supports::kAll};
+
+ /**
+ * The bootstrap connection we use to run auth. This will eventually go away when we finish
+ * implementing async auth, but for now we need to keep it alive so that the socket it
+ * creates stays open.
+ */
+ boost::optional<ConnectionPool::ConnectionPtr> _bootstrapConn;
};
/**
@@ -132,8 +149,15 @@ private:
Date_t start() const;
Message* toSend();
+
+ void setToSend(Message&& message);
+
Message* toRecv();
+ rpc::Protocol operationProtocol() const;
+
+ void setOperationProtocol(rpc::Protocol proto);
+
private:
enum class OpState {
kReady,
@@ -154,12 +178,22 @@ private:
*/
boost::optional<AsyncConnection> _connection;
+ /**
+ * The RPC protocol used for this operation. We wrap it in an optional as it
+ * is not known until we obtain a connection.
+ */
+ boost::optional<rpc::Protocol> _operationProtocol;
+
const Date_t _start;
OpState _state;
AtomicUInt64 _canceled;
- Message _toSend;
+ /**
+ * The outgoing command associated with this operation.
+ */
+ boost::optional<Message> _toSend;
+
Message _toRecv;
MSGHEADER::Value _header;
@@ -168,9 +202,8 @@ private:
void _asyncRunCommand(AsyncOp* op);
- void _messageFromRequest(const RemoteCommandRequest& request,
- Message* toSend,
- bool useOpCommand = false);
+ std::unique_ptr<Message> _messageFromRequest(const RemoteCommandRequest& request,
+ rpc::Protocol protocol);
void _asyncSendSimpleMessage(AsyncOp* op, const asio::const_buffer& buf);
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index c7e9d15caf6..94d1fdbd5ca 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -36,7 +36,11 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/jsobj.h"
+#include "mongo/rpc/factory.h"
+#include "mongo/rpc/request_builder_interface.h"
#include "mongo/util/log.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
namespace mongo {
namespace executor {
@@ -56,28 +60,23 @@ void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op) {
_connectWithDBClientConnection(op);
}
-void NetworkInterfaceASIO::_messageFromRequest(const RemoteCommandRequest& request,
- Message* toSend,
- bool useOpCommand) {
+std::unique_ptr<Message> NetworkInterfaceASIO::_messageFromRequest(
+ const RemoteCommandRequest& request, rpc::Protocol protocol) {
BSONObj query = request.cmdObj;
- invariant(query.isValid());
+ auto requestBuilder = rpc::makeRequestBuilder(protocol);
- // TODO: Once OP_COMMAND work is complete,
- // look at client to see if it supports OP_COMMAND.
+ // TODO: handle metadata writers
+ auto toSend = rpc::makeRequestBuilder(protocol)
+ ->setDatabase(request.dbname)
+ .setCommandName(request.cmdObj.firstElementFieldName())
+ .setMetadata(request.metadata)
+ .setCommandArgs(request.cmdObj)
+ .done();
- // TODO: Investigate whether we can use CommandRequestBuilder here.
-
- BufBuilder b;
- b.appendNum(0); // opts
- b.appendStr(request.dbname + ".$cmd");
- b.appendNum(0); // toSkip
- b.appendNum(1); // toReturn, don't care about responses
- query.appendSelfToBufBuilder(b);
-
- // TODO: If AsyncOp can own this buffer, we can avoid copying it in setData().
- toSend->setData(dbQuery, b.buf(), b.len());
toSend->header().setId(nextMessageId());
toSend->header().setResponseTo(0);
+
+ return toSend;
}
void NetworkInterfaceASIO::_asyncSendSimpleMessage(AsyncOp* op, const asio::const_buffer& buf) {
@@ -102,32 +101,60 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
return _completeOperation(op, kCanceledStatus);
}
- Message* toSend = op->toSend();
- _messageFromRequest(op->request(), toSend);
+ auto negotiatedProtocol =
+ rpc::negotiate(op->connection()->serverProtocols(), op->connection()->clientProtocols());
+
+ if (!negotiatedProtocol.isOK()) {
+ return _completeOperation(op, negotiatedProtocol.getStatus());
+ }
+
+ op->setOperationProtocol(negotiatedProtocol.getValue());
+
+ op->setToSend(std::move(*_messageFromRequest(op->request(), negotiatedProtocol.getValue())));
- if (toSend->empty())
+ // TODO: Is this logic actually necessary (SERVER-19320)?
+ if (op->toSend()->empty())
return _completedWriteCallback(op);
// TODO: Some day we may need to support vector messages.
- fassert(28708, toSend->buf() != 0);
- asio::const_buffer buf(toSend->buf(), toSend->size());
+ fassert(28708, op->toSend()->buf() != 0);
+ asio::const_buffer buf(op->toSend()->buf(), op->toSend()->size());
return _asyncSendSimpleMessage(op, buf);
}
void NetworkInterfaceASIO::_completedWriteCallback(AsyncOp* op) {
// If we were told to send an empty message, toRecv will be empty here.
- // TODO: handle metadata SERVER-19156
- BSONObj commandReply;
+ // TODO: handle metadata readers
+ const auto elapsed = [this, op]() { return now() - op->start(); };
+
if (op->toRecv()->empty()) {
LOG(3) << "received an empty message";
- } else {
- QueryResult::View qr = op->toRecv()->singleData().view2ptr();
- // unavoidable copy
- commandReply = BSONObj(qr.data()).getOwned();
+ return _completeOperation(op, RemoteCommandResponse(BSONObj(), BSONObj(), elapsed()));
+ }
+
+ try {
+ auto reply = rpc::makeReply(op->toRecv());
+
+ if (reply->getProtocol() != op->operationProtocol()) {
+ return _completeOperation(op,
+ Status(ErrorCodes::RPCProtocolNegotiationFailed,
+ str::stream()
+ << "Mismatched RPC protocols - request was '"
+ << opToString(op->toSend()->operation()) << "' '"
+ << " but reply was '"
+ << opToString(op->toRecv()->operation()) << "'"));
+ }
+
+ _completeOperation(op,
+ // unavoidable copy
+ RemoteCommandResponse(reply->getCommandReply().getOwned(),
+ reply->getMetadata().getOwned(),
+ elapsed()));
+ } catch (...) {
+ // makeReply can throw if the reply was invalid.
+ _completeOperation(op, exceptionToStatus());
}
- _completeOperation(
- op, RemoteCommandResponse(std::move(commandReply), BSONObj(), now() - op->start()));
}
void NetworkInterfaceASIO::_networkErrorCallback(AsyncOp* op, const std::error_code& ec) {
diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp
index bf2dcb76457..1eded832ed6 100644
--- a/src/mongo/executor/network_interface_asio_connect.cpp
+++ b/src/mongo/executor/network_interface_asio_connect.cpp
@@ -48,16 +48,27 @@ const auto kCanceledStatus = Status(ErrorCodes::CallbackCanceled, "Callback canc
NetworkInterfaceASIO::AsyncConnection::AsyncConnection(asio::ip::tcp::socket&& sock,
rpc::ProtocolSet protocols)
- : _sock(std::move(sock)), _protocols(protocols) {}
+ : AsyncConnection(std::move(sock), protocols, boost::none) {}
+
+NetworkInterfaceASIO::AsyncConnection::AsyncConnection(
+ asio::ip::tcp::socket&& sock,
+ rpc::ProtocolSet protocols,
+ boost::optional<ConnectionPool::ConnectionPtr>&& bootstrapConn)
+ : _sock(std::move(sock)),
+ _serverProtocols(protocols),
+ _bootstrapConn(std::move(bootstrapConn)) {}
#if defined(_MSC_VER) && _MSC_VER < 1900
NetworkInterfaceASIO::AsyncConnection::AsyncConnection(AsyncConnection&& other)
- : _sock(std::move(other._sock)), _protocols(other._protocols) {}
+ : _sock(std::move(other._sock)),
+ _serverProtocols(other._serverProtocols),
+ _clientProtocols(other._clientProtocols) {}
NetworkInterfaceASIO::AsyncConnection& NetworkInterfaceASIO::AsyncConnection::operator=(
AsyncConnection&& other) {
_sock = std::move(other._sock);
- _protocols = other._protocols;
+ _serverProtocols = other._serverProtocols;
+ _clientProtocols = other._clientProtocols;
return *this;
}
#endif
@@ -66,6 +77,14 @@ asio::ip::tcp::socket& NetworkInterfaceASIO::AsyncConnection::sock() {
return _sock;
}
+rpc::ProtocolSet NetworkInterfaceASIO::AsyncConnection::serverProtocols() const {
+ return _serverProtocols;
+}
+
+rpc::ProtocolSet NetworkInterfaceASIO::AsyncConnection::clientProtocols() const {
+ return _clientProtocols;
+}
+
void NetworkInterfaceASIO::_connectASIO(AsyncOp* op) {
tcp::resolver::query query(op->request().target.host(),
std::to_string(op->request().target.port()));
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
index cfebb680141..1f96c4c7926 100644
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ b/src/mongo/executor/network_interface_asio_operation.cpp
@@ -106,7 +106,7 @@ void NetworkInterfaceASIO::AsyncOp::connect(ConnectionPool* const pool,
tcp::socket sock{
*service, protocol == AF_INET ? tcp::v4() : tcp::v6(), conn.get()->port().psock->rawFD()};
- _connection.emplace(std::move(sock), conn.get()->getServerRPCProtocols());
+ _connection.emplace(std::move(sock), conn.get()->getServerRPCProtocols(), std::move(conn));
_state = OpState::kConnected;
}
@@ -147,12 +147,28 @@ Date_t NetworkInterfaceASIO::AsyncOp::start() const {
}
Message* NetworkInterfaceASIO::AsyncOp::toSend() {
- return &_toSend;
+ invariant(_toSend.is_initialized());
+ return _toSend.get_ptr();
+}
+
+void NetworkInterfaceASIO::AsyncOp::setToSend(Message&& message) {
+ invariant(!_toSend.is_initialized());
+ _toSend = std::move(message);
}
Message* NetworkInterfaceASIO::AsyncOp::toRecv() {
return &_toRecv;
}
+rpc::Protocol NetworkInterfaceASIO::AsyncOp::operationProtocol() const {
+ invariant(_operationProtocol.is_initialized());
+ return *_operationProtocol;
+}
+
+void NetworkInterfaceASIO::AsyncOp::setOperationProtocol(rpc::Protocol proto) {
+ invariant(!_operationProtocol.is_initialized());
+ _operationProtocol = proto;
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/rpc/factory.cpp b/src/mongo/rpc/factory.cpp
index ae16d16c62e..29a0a03c6fa 100644
--- a/src/mongo/rpc/factory.cpp
+++ b/src/mongo/rpc/factory.cpp
@@ -49,7 +49,11 @@ namespace rpc {
std::unique_ptr<RequestBuilderInterface> makeRequestBuilder(ProtocolSet clientProtos,
ProtocolSet serverProtos) {
- switch (uassertStatusOK(negotiate(clientProtos, serverProtos))) {
+ return makeRequestBuilder(uassertStatusOK(negotiate(clientProtos, serverProtos)));
+}
+
+std::unique_ptr<RequestBuilderInterface> makeRequestBuilder(Protocol proto) {
+ switch (proto) {
case Protocol::kOpQuery:
return stdx::make_unique<LegacyRequestBuilder>();
case Protocol::kOpCommandV1:
diff --git a/src/mongo/rpc/factory.h b/src/mongo/rpc/factory.h
index 2776ffacc47..f2d4e2fd3cb 100644
--- a/src/mongo/rpc/factory.h
+++ b/src/mongo/rpc/factory.h
@@ -52,6 +52,8 @@ class RequestInterface;
std::unique_ptr<RequestBuilderInterface> makeRequestBuilder(ProtocolSet clientProtos,
ProtocolSet serverProtos);
+std::unique_ptr<RequestBuilderInterface> makeRequestBuilder(Protocol proto);
+
/**
* Returns the appropriate concrete Reply according to the contents of the message.
* Throws if one cannot be chosen.
diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h
index 9f96026f070..8444fc04c4d 100644
--- a/src/mongo/util/net/message.h
+++ b/src/mongo/util/net/message.h
@@ -412,8 +412,6 @@ public:
_setData(buf, true);
}
- friend void swap(Message& other);
-
// vector swap() so this is fast
Message& operator=(Message&& r) {
verify(empty());