diff options
Diffstat (limited to 'src/mongo/executor/network_interface_asio.cpp')
-rw-r--r-- | src/mongo/executor/network_interface_asio.cpp | 37 |
1 files changed, 25 insertions, 12 deletions
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 8baa3506941..e766eec28cf 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -33,6 +33,7 @@ #include "mongo/executor/network_interface_asio.h" #include <chrono> +#include <utility> #include "mongo/bson/bsonobj.h" #include "mongo/db/client.h" @@ -50,6 +51,14 @@ using ResponseStatus = TaskExecutor::ResponseStatus; using asio::ip::tcp; using RemoteCommandCompletionFn = stdx::function<void(const ResponseStatus&)>; +NetworkInterfaceASIO::AsyncConnection::AsyncConnection( + ConnectionPool::ConnectionPtr&& bootstrapConn, asio::ip::tcp::socket&& sock) + : _bootstrapConn{std::move(bootstrapConn)}, _sock(std::move(sock)) {} + +tcp::socket* NetworkInterfaceASIO::AsyncConnection::sock() { + return &_sock; +} + NetworkInterfaceASIO::AsyncOp::AsyncOp(const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish, @@ -63,28 +72,36 @@ NetworkInterfaceASIO::AsyncOp::AsyncOp(const TaskExecutor::CallbackHandle& cbHan _canceled(0), _id(id) {} + void NetworkInterfaceASIO::AsyncOp::connect(ConnectionPool* const pool, asio::io_service* service, Date_t now) { - _conn = stdx::make_unique<ConnectionPool::ConnectionPtr>( - pool, _request.target, now, Milliseconds(1000)); + // TODO, why is this hardcoded to 1 second? That seems too low. + ConnectionPool::ConnectionPtr conn(pool, _request.target, now, Milliseconds(1000)); _state = OpState::kConnectionAcquired; // TODO: Add a case here for unix domain sockets. - int protocol = _conn->get()->port().localAddr().getType(); + int protocol = conn.get()->port().localAddr().getType(); if (protocol != AF_INET && protocol != AF_INET6) { throw SocketException(SocketException::CONNECT_ERROR, "Unsupported family"); } _state = OpState::kConnectionVerified; - _sock = stdx::make_unique<tcp::socket>( - *service, protocol == AF_INET ? tcp::v4() : tcp::v6(), _conn->get()->port().psock->rawFD()); + tcp::socket sock{ + *service, protocol == AF_INET ? tcp::v4() : tcp::v6(), conn.get()->port().psock->rawFD()}; + + _connection.emplace(std::move(conn), std::move(sock)); _state = OpState::kConnected; } +NetworkInterfaceASIO::AsyncConnection* NetworkInterfaceASIO::AsyncOp::connection() { + invariant(_connection.is_initialized()); + return _connection.get_ptr(); +} + std::string NetworkInterfaceASIO::AsyncOp::toString() const { str::stream output; output << "op number: " << _id; @@ -155,10 +172,6 @@ void NetworkInterfaceASIO::AsyncOp::setOutput(const BSONObj& bson) { _output = bson; } -tcp::socket* NetworkInterfaceASIO::AsyncOp::sock() { - return _sock.get(); -} - Date_t NetworkInterfaceASIO::AsyncOp::start() const { return _start; } @@ -213,7 +226,7 @@ void NetworkInterfaceASIO::_messageFromRequest(const RemoteCommandRequest& reque } void NetworkInterfaceASIO::_asyncSendSimpleMessage(AsyncOp* op, const asio::const_buffer& buf) { - asio::async_write(*(op->sock()), + asio::async_write(*(op->connection()->sock()), asio::buffer(buf), [this, op](std::error_code ec, std::size_t bytes) { if (ec) @@ -272,7 +285,7 @@ void NetworkInterfaceASIO::_recvMessageBody(AsyncOp* op) { invariant(bodyLength >= 0); // receive remaining data into md->data - asio::async_read(*(op->sock()), + asio::async_read(*(op->connection()->sock()), asio::buffer(mdView.data(), bodyLength), [this, op, mdView](asio::error_code ec, size_t bytes) { if (ec) { @@ -288,7 +301,7 @@ void NetworkInterfaceASIO::_recvMessageBody(AsyncOp* op) { } void NetworkInterfaceASIO::_recvMessageHeader(AsyncOp* op) { - asio::async_read(*(op->sock()), + asio::async_read(*(op->connection()->sock()), asio::buffer(reinterpret_cast<char*>(op->header()), sizeof(MSGHEADER::Value)), [this, op](asio::error_code ec, size_t bytes) { if (ec) { |