summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAdam Midvidy <amidvidy@gmail.com>2015-07-02 11:36:42 -0400
committerAdam Midvidy <amidvidy@gmail.com>2015-07-02 11:55:42 -0400
commitf1de5573a8861ba5f497f3588a5005a8238d2b91 (patch)
tree6a2939de3700d3da69ff4c1e830e13b063f5306e /src/mongo
parent32608244469587175f22af79075501d644da41bf (diff)
downloadmongo-f1de5573a8861ba5f497f3588a5005a8238d2b91.tar.gz
SERVER-19156 refactor per-connection state from AsyncOp to AsyncConnection
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/client/connection_pool.cpp12
-rw-r--r--src/mongo/client/connection_pool.h8
-rw-r--r--src/mongo/executor/network_interface_asio.cpp37
-rw-r--r--src/mongo/executor/network_interface_asio.h31
4 files changed, 69 insertions, 19 deletions
diff --git a/src/mongo/client/connection_pool.cpp b/src/mongo/client/connection_pool.cpp
index 99590127850..5e9062ca831 100644
--- a/src/mongo/client/connection_pool.cpp
+++ b/src/mongo/client/connection_pool.cpp
@@ -223,6 +223,18 @@ ConnectionPool::ConnectionPtr::~ConnectionPtr() {
}
}
+ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPtr&& other)
+ : _pool(std::move(other._pool)), _connInfo(std::move(other._connInfo)) {
+ other._pool = nullptr;
+}
+
+ConnectionPool::ConnectionPtr& ConnectionPool::ConnectionPtr::operator=(ConnectionPtr&& other) {
+ _pool = std::move(other._pool);
+ _connInfo = std::move(other._connInfo);
+ other._pool = nullptr;
+ return *this;
+}
+
void ConnectionPool::ConnectionPtr::done(Date_t now) {
_pool->releaseConnection(_connInfo, now);
_pool = NULL;
diff --git a/src/mongo/client/connection_pool.h b/src/mongo/client/connection_pool.h
index 15427fe823d..8db5ceddc5b 100644
--- a/src/mongo/client/connection_pool.h
+++ b/src/mongo/client/connection_pool.h
@@ -96,6 +96,12 @@ public:
*/
~ConnectionPtr();
+ // We need to provide user defined move operations as we need to set the pool
+ // pointer to nullptr on the moved-from object.
+ ConnectionPtr(ConnectionPtr&&);
+
+ ConnectionPtr& operator=(ConnectionPtr&&);
+
/**
* Obtains the underlying connection which can be used for making calls to the server.
*/
@@ -110,7 +116,7 @@ public:
private:
ConnectionPool* _pool;
- const ConnectionList::iterator _connInfo;
+ ConnectionList::iterator _connInfo;
};
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index 8baa3506941..e766eec28cf 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -33,6 +33,7 @@
#include "mongo/executor/network_interface_asio.h"
#include <chrono>
+#include <utility>
#include "mongo/bson/bsonobj.h"
#include "mongo/db/client.h"
@@ -50,6 +51,14 @@ using ResponseStatus = TaskExecutor::ResponseStatus;
using asio::ip::tcp;
using RemoteCommandCompletionFn = stdx::function<void(const ResponseStatus&)>;
+NetworkInterfaceASIO::AsyncConnection::AsyncConnection(
+ ConnectionPool::ConnectionPtr&& bootstrapConn, asio::ip::tcp::socket&& sock)
+ : _bootstrapConn{std::move(bootstrapConn)}, _sock(std::move(sock)) {}
+
+tcp::socket* NetworkInterfaceASIO::AsyncConnection::sock() {
+ return &_sock;
+}
+
NetworkInterfaceASIO::AsyncOp::AsyncOp(const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish,
@@ -63,28 +72,36 @@ NetworkInterfaceASIO::AsyncOp::AsyncOp(const TaskExecutor::CallbackHandle& cbHan
_canceled(0),
_id(id) {}
+
void NetworkInterfaceASIO::AsyncOp::connect(ConnectionPool* const pool,
asio::io_service* service,
Date_t now) {
- _conn = stdx::make_unique<ConnectionPool::ConnectionPtr>(
- pool, _request.target, now, Milliseconds(1000));
+ // TODO, why is this hardcoded to 1 second? That seems too low.
+ ConnectionPool::ConnectionPtr conn(pool, _request.target, now, Milliseconds(1000));
_state = OpState::kConnectionAcquired;
// TODO: Add a case here for unix domain sockets.
- int protocol = _conn->get()->port().localAddr().getType();
+ int protocol = conn.get()->port().localAddr().getType();
if (protocol != AF_INET && protocol != AF_INET6) {
throw SocketException(SocketException::CONNECT_ERROR, "Unsupported family");
}
_state = OpState::kConnectionVerified;
- _sock = stdx::make_unique<tcp::socket>(
- *service, protocol == AF_INET ? tcp::v4() : tcp::v6(), _conn->get()->port().psock->rawFD());
+ tcp::socket sock{
+ *service, protocol == AF_INET ? tcp::v4() : tcp::v6(), conn.get()->port().psock->rawFD()};
+
+ _connection.emplace(std::move(conn), std::move(sock));
_state = OpState::kConnected;
}
+NetworkInterfaceASIO::AsyncConnection* NetworkInterfaceASIO::AsyncOp::connection() {
+ invariant(_connection.is_initialized());
+ return _connection.get_ptr();
+}
+
std::string NetworkInterfaceASIO::AsyncOp::toString() const {
str::stream output;
output << "op number: " << _id;
@@ -155,10 +172,6 @@ void NetworkInterfaceASIO::AsyncOp::setOutput(const BSONObj& bson) {
_output = bson;
}
-tcp::socket* NetworkInterfaceASIO::AsyncOp::sock() {
- return _sock.get();
-}
-
Date_t NetworkInterfaceASIO::AsyncOp::start() const {
return _start;
}
@@ -213,7 +226,7 @@ void NetworkInterfaceASIO::_messageFromRequest(const RemoteCommandRequest& reque
}
void NetworkInterfaceASIO::_asyncSendSimpleMessage(AsyncOp* op, const asio::const_buffer& buf) {
- asio::async_write(*(op->sock()),
+ asio::async_write(*(op->connection()->sock()),
asio::buffer(buf),
[this, op](std::error_code ec, std::size_t bytes) {
if (ec)
@@ -272,7 +285,7 @@ void NetworkInterfaceASIO::_recvMessageBody(AsyncOp* op) {
invariant(bodyLength >= 0);
// receive remaining data into md->data
- asio::async_read(*(op->sock()),
+ asio::async_read(*(op->connection()->sock()),
asio::buffer(mdView.data(), bodyLength),
[this, op, mdView](asio::error_code ec, size_t bytes) {
if (ec) {
@@ -288,7 +301,7 @@ void NetworkInterfaceASIO::_recvMessageBody(AsyncOp* op) {
}
void NetworkInterfaceASIO::_recvMessageHeader(AsyncOp* op) {
- asio::async_read(*(op->sock()),
+ asio::async_read(*(op->connection()->sock()),
asio::buffer(reinterpret_cast<char*>(op->header()), sizeof(MSGHEADER::Value)),
[this, op](asio::error_code ec, size_t bytes) {
if (ec) {
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index f1e91173f44..0a6a13a08bd 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -30,6 +30,7 @@
#include <asio.hpp>
#include <atomic>
+#include <boost/optional.hpp>
#include <system_error>
#include <thread>
#include <unordered_map>
@@ -72,6 +73,22 @@ private:
enum class State { kReady, kRunning, kShutdown };
/**
+ * AsyncConnection encapsulates the per-connection state we maintain.
+ */
+ class AsyncConnection {
+ public:
+ AsyncConnection(
+ ConnectionPool::ConnectionPtr&& booststrapConn,
+ asio::ip::tcp::socket&& sock);
+
+ asio::ip::tcp::socket* sock();
+
+ private:
+ ConnectionPool::ConnectionPtr _bootstrapConn;
+ asio::ip::tcp::socket _sock;
+ };
+
+ /**
* Helper object to manage individual network operations.
*/
class AsyncOp {
@@ -91,6 +108,8 @@ private:
void complete(Date_t now);
+ AsyncConnection* connection();
+
void connect(ConnectionPool* const pool, asio::io_service* service, Date_t now);
bool connected() const;
@@ -102,8 +121,6 @@ private:
void setOutput(const BSONObj& bson);
- asio::ip::tcp::socket* sock();
-
Date_t start() const;
Message* toSend();
@@ -123,15 +140,17 @@ private:
RemoteCommandRequest _request;
RemoteCommandCompletionFn _onFinish;
+ /**
+ * The connection state used to service this request. We wrap it in an optional
+ * as it is instantiated at some point after the AsyncOp is created.
+ */
+ boost::optional<AsyncConnection> _connection;
+
const Date_t _start;
OpState _state;
AtomicUInt64 _canceled;
- std::unique_ptr<ConnectionPool::ConnectionPtr> _conn;
-
- std::unique_ptr<asio::ip::tcp::socket> _sock;
-
Message _toSend;
Message _toRecv;
MSGHEADER::Value _header;