diff options
author | Adam Midvidy <amidvidy@gmail.com> | 2015-07-02 11:36:42 -0400 |
---|---|---|
committer | Adam Midvidy <amidvidy@gmail.com> | 2015-07-02 11:55:42 -0400 |
commit | f1de5573a8861ba5f497f3588a5005a8238d2b91 (patch) | |
tree | 6a2939de3700d3da69ff4c1e830e13b063f5306e | |
parent | 32608244469587175f22af79075501d644da41bf (diff) | |
download | mongo-f1de5573a8861ba5f497f3588a5005a8238d2b91.tar.gz |
SERVER-19156 refactor per-connection state from AsyncOp to AsyncConnection
-rw-r--r-- | src/mongo/client/connection_pool.cpp | 12 | ||||
-rw-r--r-- | src/mongo/client/connection_pool.h | 8 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio.cpp | 37 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio.h | 31 |
4 files changed, 69 insertions, 19 deletions
diff --git a/src/mongo/client/connection_pool.cpp b/src/mongo/client/connection_pool.cpp index 99590127850..5e9062ca831 100644 --- a/src/mongo/client/connection_pool.cpp +++ b/src/mongo/client/connection_pool.cpp @@ -223,6 +223,18 @@ ConnectionPool::ConnectionPtr::~ConnectionPtr() { } } +ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPtr&& other) + : _pool(std::move(other._pool)), _connInfo(std::move(other._connInfo)) { + other._pool = nullptr; +} + +ConnectionPool::ConnectionPtr& ConnectionPool::ConnectionPtr::operator=(ConnectionPtr&& other) { + _pool = std::move(other._pool); + _connInfo = std::move(other._connInfo); + other._pool = nullptr; + return *this; +} + void ConnectionPool::ConnectionPtr::done(Date_t now) { _pool->releaseConnection(_connInfo, now); _pool = NULL; diff --git a/src/mongo/client/connection_pool.h b/src/mongo/client/connection_pool.h index 15427fe823d..8db5ceddc5b 100644 --- a/src/mongo/client/connection_pool.h +++ b/src/mongo/client/connection_pool.h @@ -96,6 +96,12 @@ public: */ ~ConnectionPtr(); + // We need to provide user defined move operations as we need to set the pool + // pointer to nullptr on the moved-from object. + ConnectionPtr(ConnectionPtr&&); + + ConnectionPtr& operator=(ConnectionPtr&&); + /** * Obtains the underlying connection which can be used for making calls to the server. */ @@ -110,7 +116,7 @@ public: private: ConnectionPool* _pool; - const ConnectionList::iterator _connInfo; + ConnectionList::iterator _connInfo; }; diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 8baa3506941..e766eec28cf 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -33,6 +33,7 @@ #include "mongo/executor/network_interface_asio.h" #include <chrono> +#include <utility> #include "mongo/bson/bsonobj.h" #include "mongo/db/client.h" @@ -50,6 +51,14 @@ using ResponseStatus = TaskExecutor::ResponseStatus; using asio::ip::tcp; using RemoteCommandCompletionFn = stdx::function<void(const ResponseStatus&)>; +NetworkInterfaceASIO::AsyncConnection::AsyncConnection( + ConnectionPool::ConnectionPtr&& bootstrapConn, asio::ip::tcp::socket&& sock) + : _bootstrapConn{std::move(bootstrapConn)}, _sock(std::move(sock)) {} + +tcp::socket* NetworkInterfaceASIO::AsyncConnection::sock() { + return &_sock; +} + NetworkInterfaceASIO::AsyncOp::AsyncOp(const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish, @@ -63,28 +72,36 @@ NetworkInterfaceASIO::AsyncOp::AsyncOp(const TaskExecutor::CallbackHandle& cbHan _canceled(0), _id(id) {} + void NetworkInterfaceASIO::AsyncOp::connect(ConnectionPool* const pool, asio::io_service* service, Date_t now) { - _conn = stdx::make_unique<ConnectionPool::ConnectionPtr>( - pool, _request.target, now, Milliseconds(1000)); + // TODO, why is this hardcoded to 1 second? That seems too low. + ConnectionPool::ConnectionPtr conn(pool, _request.target, now, Milliseconds(1000)); _state = OpState::kConnectionAcquired; // TODO: Add a case here for unix domain sockets. - int protocol = _conn->get()->port().localAddr().getType(); + int protocol = conn.get()->port().localAddr().getType(); if (protocol != AF_INET && protocol != AF_INET6) { throw SocketException(SocketException::CONNECT_ERROR, "Unsupported family"); } _state = OpState::kConnectionVerified; - _sock = stdx::make_unique<tcp::socket>( - *service, protocol == AF_INET ? tcp::v4() : tcp::v6(), _conn->get()->port().psock->rawFD()); + tcp::socket sock{ + *service, protocol == AF_INET ? tcp::v4() : tcp::v6(), conn.get()->port().psock->rawFD()}; + + _connection.emplace(std::move(conn), std::move(sock)); _state = OpState::kConnected; } +NetworkInterfaceASIO::AsyncConnection* NetworkInterfaceASIO::AsyncOp::connection() { + invariant(_connection.is_initialized()); + return _connection.get_ptr(); +} + std::string NetworkInterfaceASIO::AsyncOp::toString() const { str::stream output; output << "op number: " << _id; @@ -155,10 +172,6 @@ void NetworkInterfaceASIO::AsyncOp::setOutput(const BSONObj& bson) { _output = bson; } -tcp::socket* NetworkInterfaceASIO::AsyncOp::sock() { - return _sock.get(); -} - Date_t NetworkInterfaceASIO::AsyncOp::start() const { return _start; } @@ -213,7 +226,7 @@ void NetworkInterfaceASIO::_messageFromRequest(const RemoteCommandRequest& reque } void NetworkInterfaceASIO::_asyncSendSimpleMessage(AsyncOp* op, const asio::const_buffer& buf) { - asio::async_write(*(op->sock()), + asio::async_write(*(op->connection()->sock()), asio::buffer(buf), [this, op](std::error_code ec, std::size_t bytes) { if (ec) @@ -272,7 +285,7 @@ void NetworkInterfaceASIO::_recvMessageBody(AsyncOp* op) { invariant(bodyLength >= 0); // receive remaining data into md->data - asio::async_read(*(op->sock()), + asio::async_read(*(op->connection()->sock()), asio::buffer(mdView.data(), bodyLength), [this, op, mdView](asio::error_code ec, size_t bytes) { if (ec) { @@ -288,7 +301,7 @@ void NetworkInterfaceASIO::_recvMessageBody(AsyncOp* op) { } void NetworkInterfaceASIO::_recvMessageHeader(AsyncOp* op) { - asio::async_read(*(op->sock()), + asio::async_read(*(op->connection()->sock()), asio::buffer(reinterpret_cast<char*>(op->header()), sizeof(MSGHEADER::Value)), [this, op](asio::error_code ec, size_t bytes) { if (ec) { diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index f1e91173f44..0a6a13a08bd 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -30,6 +30,7 @@ #include <asio.hpp> #include <atomic> +#include <boost/optional.hpp> #include <system_error> #include <thread> #include <unordered_map> @@ -72,6 +73,22 @@ private: enum class State { kReady, kRunning, kShutdown }; /** + * AsyncConnection encapsulates the per-connection state we maintain. + */ + class AsyncConnection { + public: + AsyncConnection( + ConnectionPool::ConnectionPtr&& booststrapConn, + asio::ip::tcp::socket&& sock); + + asio::ip::tcp::socket* sock(); + + private: + ConnectionPool::ConnectionPtr _bootstrapConn; + asio::ip::tcp::socket _sock; + }; + + /** * Helper object to manage individual network operations. */ class AsyncOp { @@ -91,6 +108,8 @@ private: void complete(Date_t now); + AsyncConnection* connection(); + void connect(ConnectionPool* const pool, asio::io_service* service, Date_t now); bool connected() const; @@ -102,8 +121,6 @@ private: void setOutput(const BSONObj& bson); - asio::ip::tcp::socket* sock(); - Date_t start() const; Message* toSend(); @@ -123,15 +140,17 @@ private: RemoteCommandRequest _request; RemoteCommandCompletionFn _onFinish; + /** + * The connection state used to service this request. We wrap it in an optional + * as it is instantiated at some point after the AsyncOp is created. + */ + boost::optional<AsyncConnection> _connection; + const Date_t _start; OpState _state; AtomicUInt64 _canceled; - std::unique_ptr<ConnectionPool::ConnectionPtr> _conn; - - std::unique_ptr<asio::ip::tcp::socket> _sock; - Message _toSend; Message _toRecv; MSGHEADER::Value _header; |