summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-06-09 20:21:49 -0700
committerSage Weil <sage@inktank.com>2013-06-13 10:52:18 -0700
commite96c0ceec7b22b93a21bea70b005687966566c39 (patch)
tree1da537c1ef2398d0479d4f9b09c851d33b74279e
parent77db175c9d8b565e302393d785c3f350f1338aef (diff)
downloadceph-e96c0ceec7b22b93a21bea70b005687966566c39.tar.gz
msgr: use ConnectionRef throughout
Make RefCountedObject a private parent of Connection so that users are forced to use ConnectionRef whenever references are taken. Many methods can still take a raw Connection* when they are using the caller's reference but not taking their own; this is cheaper than twiddling the reference count, and the lifetime is still well defined. Local variables generally use ConnectionRef, though. Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/client/Client.cc17
-rw-r--r--src/client/MetaSession.cc2
-rw-r--r--src/client/MetaSession.h2
-rw-r--r--src/mds/MDS.h6
-rw-r--r--src/mds/Server.cc6
-rw-r--r--src/mds/SessionMap.h2
-rw-r--r--src/mon/AuthMonitor.cc2
-rw-r--r--src/mon/MonClient.cc15
-rw-r--r--src/mon/MonClient.h2
-rw-r--r--src/mon/Monitor.cc19
-rw-r--r--src/mon/Monitor.h6
-rw-r--r--src/mon/PGMonitor.cc2
-rw-r--r--src/mon/Session.h10
-rw-r--r--src/msg/DispatchQueue.cc2
-rw-r--r--src/msg/Message.h21
-rw-r--r--src/msg/Messenger.h10
-rw-r--r--src/msg/Pipe.cc21
-rw-r--r--src/msg/Pipe.h2
-rw-r--r--src/msg/SimpleMessenger.cc13
-rw-r--r--src/msg/SimpleMessenger.h6
-rw-r--r--src/osd/OSD.cc37
-rw-r--r--src/osd/OSD.h25
-rw-r--r--src/osdc/Objecter.cc7
-rw-r--r--src/osdc/Objecter.h4
24 files changed, 98 insertions, 141 deletions
diff --git a/src/client/Client.cc b/src/client/Client.cc
index 21cf2678607..204dc98d74d 100644
--- a/src/client/Client.cc
+++ b/src/client/Client.cc
@@ -776,7 +776,7 @@ void Client::update_dir_dist(Inode *in, DirStat *dst)
void Client::insert_readdir_results(MetaRequest *request, MetaSession *session, Inode *diri) {
MClientReply *reply = request->reply;
- Connection *con = request->reply->get_connection();
+ ConnectionRef con = request->reply->get_connection();
uint64_t features = con->get_features();
assert(request->readdir_result.empty());
@@ -918,7 +918,7 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
return NULL;
}
- Connection *con = request->reply->get_connection();
+ ConnectionRef con = request->reply->get_connection();
uint64_t features = con->get_features();
ldout(cct, 10) << " features 0x" << hex << features << dec << dendl;
@@ -1522,7 +1522,7 @@ void Client::handle_client_session(MClientSession *m)
int from = m->get_source().num();
ldout(cct, 10) << "handle_client_session " << *m << " from mds." << from << dendl;
- MetaSession *session = _get_mds_session(from, m->get_connection());
+ MetaSession *session = _get_mds_session(from, m->get_connection().get());
if (!session) {
ldout(cct, 10) << " discarding session message from sessionless mds " << m->get_source_inst() << dendl;
m->put();
@@ -1639,7 +1639,7 @@ MClientRequest* Client::build_client_request(MetaRequest *request)
void Client::handle_client_request_forward(MClientRequestForward *fwd)
{
int mds = fwd->get_source().num();
- MetaSession *session = _get_mds_session(mds, fwd->get_connection());
+ MetaSession *session = _get_mds_session(mds, fwd->get_connection().get());
if (!session) {
fwd->put();
return;
@@ -1677,7 +1677,7 @@ void Client::handle_client_request_forward(MClientRequestForward *fwd)
void Client::handle_client_reply(MClientReply *reply)
{
int mds_num = reply->get_source().num();
- MetaSession *session = _get_mds_session(mds_num, reply->get_connection());
+ MetaSession *session = _get_mds_session(mds_num, reply->get_connection().get());
if (!session) {
reply->put();
return;
@@ -1879,7 +1879,6 @@ void Client::handle_mds_map(MMDSMap* m)
mds_sessions.count(p->first)) {
MetaSession *session = mds_sessions[p->first];
session->inst = mdsmap->get_inst(p->first);
- session->con->put();
session->con = messenger->get_connection(session->inst);
send_reconnect(session);
}
@@ -2023,7 +2022,7 @@ void Client::handle_lease(MClientLease *m)
assert(m->get_action() == CEPH_MDS_LEASE_REVOKE);
int mds = m->get_source().num();
- MetaSession *session = _get_mds_session(mds, m->get_connection());
+ MetaSession *session = _get_mds_session(mds, m->get_connection().get());
if (!session) {
m->put();
return;
@@ -3218,7 +3217,7 @@ void Client::handle_snap(MClientSnap *m)
{
ldout(cct, 10) << "handle_snap " << *m << dendl;
int mds = m->get_source().num();
- MetaSession *session = _get_mds_session(mds, m->get_connection());
+ MetaSession *session = _get_mds_session(mds, m->get_connection().get());
if (!session) {
m->put();
return;
@@ -3294,7 +3293,7 @@ void Client::handle_snap(MClientSnap *m)
void Client::handle_caps(MClientCaps *m)
{
int mds = m->get_source().num();
- MetaSession *session = _get_mds_session(mds, m->get_connection());
+ MetaSession *session = _get_mds_session(mds, m->get_connection().get());
if (!session) {
m->put();
return;
diff --git a/src/client/MetaSession.cc b/src/client/MetaSession.cc
index 6c1685217d2..ee38db36d97 100644
--- a/src/client/MetaSession.cc
+++ b/src/client/MetaSession.cc
@@ -36,6 +36,4 @@ MetaSession::~MetaSession()
{
if (release)
release->put();
- if (con)
- con->put();
}
diff --git a/src/client/MetaSession.h b/src/client/MetaSession.h
index 0cff7586c36..1f6a1240617 100644
--- a/src/client/MetaSession.h
+++ b/src/client/MetaSession.h
@@ -19,7 +19,7 @@ class MClientCapRelease;
struct MetaSession {
int mds_num;
- Connection *con;
+ ConnectionRef con;
version_t seq;
uint64_t cap_gen;
utime_t cap_ttl, last_cap_renew_request;
diff --git a/src/mds/MDS.h b/src/mds/MDS.h
index 4e69dcaf8f9..9e3e2dae9c3 100644
--- a/src/mds/MDS.h
+++ b/src/mds/MDS.h
@@ -346,8 +346,14 @@ class MDS : public Dispatcher {
void send_message_client_counted(Message *m, client_t client);
void send_message_client_counted(Message *m, Session *session);
void send_message_client_counted(Message *m, Connection *connection);
+ void send_message_client_counted(Message *m, const ConnectionRef& con) {
+ send_message_client_counted(m, con.get());
+ }
void send_message_client(Message *m, Session *session);
void send_message(Message *m, Connection *c);
+ void send_message(Message *m, const ConnectionRef& c) {
+ send_message(m, c.get());
+ }
// start up, shutdown
int init(int wanted_state=MDSMap::STATE_BOOT);
diff --git a/src/mds/Server.cc b/src/mds/Server.cc
index abeea2c12d5..253c56d7a37 100644
--- a/src/mds/Server.cc
+++ b/src/mds/Server.cc
@@ -306,7 +306,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
// ms_handle_remote_reset() and realize they had in fact closed.
// do this *before* sending the message to avoid a possible
// race.
- mds->messenger->mark_disposable(session->connection);
+ mds->messenger->mark_disposable(session->connection.get());
// reset session
mds->send_message_client(new MClientSession(CEPH_SESSION_CLOSE), session);
@@ -901,8 +901,7 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei,
}
// note client connection to direct my reply
- Connection *client_con = req->get_connection();
- client_con->get();
+ ConnectionRef client_con = req->get_connection();
// drop non-rdlocks before replying, so that we can issue leases
mdcache->request_drop_non_rdlocks(mdr);
@@ -929,7 +928,6 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei,
reply->set_mdsmap_epoch(mds->mdsmap->get_epoch());
messenger->send_message(reply, client_con);
}
- client_con->put();
// clean up request
mdcache->request_finish(mdr);
diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h
index 905a99b81c1..f3e7e0990e1 100644
--- a/src/mds/SessionMap.h
+++ b/src/mds/SessionMap.h
@@ -84,7 +84,7 @@ private:
public:
session_info_t info; ///< durable bits
- Connection *connection;
+ ConnectionRef connection;
xlist<Session*>::item item_session_list;
list<Message*> preopen_out_queue; ///< messages for client, queued before they connect
diff --git a/src/mon/AuthMonitor.cc b/src/mon/AuthMonitor.cc
index e4e0dacd8ba..ec639c6d783 100644
--- a/src/mon/AuthMonitor.cc
+++ b/src/mon/AuthMonitor.cc
@@ -479,7 +479,7 @@ bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable)
// always send the latest monmap.
if (m->monmap_epoch < mon->monmap->get_epoch())
- mon->send_latest_monmap(m->get_connection());
+ mon->send_latest_monmap(m->get_connection().get());
proto = s->auth_handler->start_session(entity_name, indata, response_bl, caps_info);
ret = 0;
diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc
index b52ce603158..d7003c13520 100644
--- a/src/mon/MonClient.cc
+++ b/src/mon/MonClient.cc
@@ -139,7 +139,6 @@ int MonClient::get_monmap_privately()
if (monmap.fsid.is_zero()) {
messenger->mark_down(cur_con); // nope, clean that connection up
- cur_con->put();
}
}
@@ -156,10 +155,7 @@ int MonClient::get_monmap_privately()
hunting = true; // reset this to true!
cur_mon.clear();
- if (cur_con) {
- cur_con->put();
- cur_con = NULL;
- }
+ cur_con.reset(NULL);
if (!monmap.fsid.is_zero())
return 0;
@@ -333,10 +329,8 @@ void MonClient::shutdown()
}
monc_lock.Lock();
timer.shutdown();
- if (cur_con) {
- cur_con->put();
- cur_con = NULL;
- }
+
+ cur_con.reset(NULL);
monc_lock.Unlock();
}
@@ -495,7 +489,6 @@ void MonClient::_pick_new_mon()
if (cur_con) {
messenger->mark_down(cur_con);
- cur_con->put();
}
cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
@@ -591,7 +584,7 @@ void MonClient::tick()
if (now > sub_renew_after)
_renew_subs();
- messenger->send_keepalive(cur_con);
+ messenger->send_keepalive(cur_con.get());
if (state == MC_STATE_HAVE_SESSION) {
send_log();
diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h
index 04e6b0e44b5..b5333e1714a 100644
--- a/src/mon/MonClient.h
+++ b/src/mon/MonClient.h
@@ -63,7 +63,7 @@ private:
Messenger *messenger;
string cur_mon;
- Connection *cur_con;
+ ConnectionRef cur_con;
SimpleRNG rng;
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index 5a3d6fbb7b4..986d8633cc3 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -211,8 +211,6 @@ Monitor::~Monitor()
delete paxos;
assert(session_map.sessions.empty());
delete mon_caps;
- if (con_self)
- con_self->put();
}
@@ -2912,7 +2910,6 @@ void Monitor::forward_request_leader(PaxosServiceMessage *req)
rr->tid = ++routed_request_tid;
rr->client_inst = req->get_source_inst();
rr->con = req->get_connection();
- rr->con->get();
encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
rr->session = static_cast<MonSession *>(session->get());
routed_requests[rr->tid] = rr;
@@ -2951,7 +2948,7 @@ void Monitor::handle_forward(MForward *m)
s->caps = m->client_caps;
dout(10) << " caps are " << s->caps << dendl;
- s->proxy_con = m->get_connection()->get();
+ s->proxy_con = m->get_connection();
s->proxy_tid = m->tid;
PaxosServiceMessage *req = m->msg;
@@ -2972,8 +2969,7 @@ void Monitor::handle_forward(MForward *m)
or the Session. And due to the special nature of this message,
nobody refers to the Connection via the Session. So, clear out that
half of the ref loop.*/
- s->con->put();
- s->con = NULL;
+ s->con.reset(NULL);
dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
@@ -3105,7 +3101,6 @@ void Monitor::resend_routed_requests()
if (mon == rank) {
dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl;
req->set_connection(rr->con);
- rr->con->get();
retry.push_back(new C_RetryMessage(this, req));
delete rr;
} else {
@@ -3172,7 +3167,7 @@ void Monitor::waitlist_or_zap_client(Message *m)
* 3) command messages. We want to accept these under all possible
* circumstances.
*/
- Connection *con = m->get_connection();
+ ConnectionRef con = m->get_connection();
utime_t too_old = ceph_clock_now(g_ceph_context);
too_old -= g_ceph_context->_conf->mon_lease;
if (m->get_recv_stamp() > too_old &&
@@ -3195,7 +3190,7 @@ bool Monitor::_ms_dispatch(Message *m)
return true;
}
- Connection *connection = m->get_connection();
+ ConnectionRef connection = m->get_connection();
MonSession *s = NULL;
MonCap caps;
EntityName entity_name;
@@ -3219,7 +3214,7 @@ bool Monitor::_ms_dispatch(Message *m)
return true;
}
dout(10) << "do not have session, making new one" << dendl;
- s = session_map.new_session(m->get_source_inst(), m->get_connection());
+ s = session_map.new_session(m->get_source_inst(), m->get_connection().get());
m->get_connection()->set_priv(s->get());
dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl;
@@ -3921,7 +3916,7 @@ void Monitor::check_sub(Subscription *sub)
{
dout(10) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl;
if (sub->next <= monmap->get_epoch()) {
- send_latest_monmap(sub->session->con);
+ send_latest_monmap(sub->session->con.get());
if (sub->onetime)
session_map.remove_sub(sub);
else
@@ -3942,7 +3937,7 @@ void Monitor::send_latest_monmap(Connection *con)
void Monitor::handle_mon_get_map(MMonGetMap *m)
{
dout(10) << "handle_mon_get_map" << dendl;
- send_latest_monmap(m->get_connection());
+ send_latest_monmap(m->get_connection().get());
m->put();
}
diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h
index e860f8ea375..9e94fa90aaa 100644
--- a/src/mon/Monitor.h
+++ b/src/mon/Monitor.h
@@ -109,7 +109,7 @@ public:
string name;
int rank;
Messenger *messenger;
- Connection *con_self;
+ ConnectionRef con_self;
Mutex lock;
SafeTimer timer;
@@ -1312,14 +1312,12 @@ public:
uint64_t tid;
bufferlist request_bl;
MonSession *session;
- Connection *con;
+ ConnectionRef con;
entity_inst_t client_inst;
~RoutedRequest() {
if (session)
session->put();
- if (con)
- con->put();
}
};
uint64_t routed_request_tid;
diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc
index 58b1fc7bd52..1178fcc381b 100644
--- a/src/mon/PGMonitor.cc
+++ b/src/mon/PGMonitor.cc
@@ -1604,6 +1604,6 @@ void PGMonitor::check_sub(Subscription *sub)
{
if (sub->type == "osd_pg_creates") {
send_pg_creates(sub->session->inst.name.num(),
- sub->session->con);
+ sub->session->con.get());
}
}
diff --git a/src/mon/Session.h b/src/mon/Session.h
index c8a7f3c4595..7df6001581c 100644
--- a/src/mon/Session.h
+++ b/src/mon/Session.h
@@ -38,7 +38,7 @@ struct Subscription {
};
struct MonSession : public RefCountedObject {
- Connection *con;
+ ConnectionRef con;
entity_inst_t inst;
utime_t until;
utime_t time_established;
@@ -54,21 +54,17 @@ struct MonSession : public RefCountedObject {
AuthServiceHandler *auth_handler;
- Connection *proxy_con;
+ ConnectionRef proxy_con;
uint64_t proxy_tid;
MonSession(const entity_inst_t& i, Connection *c) :
- con(c->get()), inst(i), closed(false), item(this),
+ con(c), inst(i), closed(false), item(this),
auid(0),
global_id(0), notified_global_id(0), auth_handler(NULL),
proxy_con(NULL), proxy_tid(0) {
time_established = ceph_clock_now(g_ceph_context);
}
~MonSession() {
- if (con)
- con->put();
- if (proxy_con)
- proxy_con->put();
//generic_dout(0) << "~MonSession " << this << dendl;
// we should have been removed before we get destructed; see MonSessionMap::remove_session()
assert(!item.is_on_list());
diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc
index 67bb52e6c7b..a92c357beb5 100644
--- a/src/msg/DispatchQueue.cc
+++ b/src/msg/DispatchQueue.cc
@@ -54,7 +54,7 @@ void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
void DispatchQueue::local_delivery(Message *m, int priority)
{
Mutex::Locker l(lock);
- m->set_connection(msgr->local_connection->get());
+ m->set_connection(msgr->local_connection.get());
add_arrival(m);
if (priority >= CEPH_MSG_PRIO_LOW) {
mqueue.enqueue_strict(
diff --git a/src/msg/Message.h b/src/msg/Message.h
index aca91184141..0089751c7ed 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -161,7 +161,7 @@
class Messenger;
-struct Connection : public RefCountedObject {
+struct Connection : private RefCountedObject {
Mutex lock;
Messenger *msgr;
RefCountedObject *priv;
@@ -174,6 +174,8 @@ struct Connection : public RefCountedObject {
int rx_buffers_version;
map<tid_t,pair<bufferlist,int> > rx_buffers;
+ friend class boost::intrusive_ptr<Connection>;
+
public:
Connection(Messenger *m)
: lock("Connection::lock"),
@@ -184,6 +186,9 @@ public:
pipe(NULL),
failed(false),
rx_buffers_version(0) {
+ // we are managed exlusively by ConnectionRef; make it so you can
+ // ConnectionRef foo = new Connection;
+ nref.set(0);
}
~Connection() {
//generic_dout(0) << "~Connection " << this << dendl;
@@ -195,10 +200,6 @@ public:
pipe->put();
}
- Connection *get() {
- return static_cast<Connection *>(RefCountedObject::get());
- }
-
void set_priv(RefCountedObject *o) {
Mutex::Locker l(lock);
if (priv)
@@ -304,7 +305,7 @@ protected:
/* time at which message was fully read */
utime_t recv_complete_stamp;
- Connection *connection;
+ ConnectionRef connection;
// release our size in bytes back to this throttler when our payload
// is adjusted or when we are destroyed.
@@ -352,18 +353,14 @@ public:
protected:
virtual ~Message() {
assert(nref.read() == 0);
- if (connection)
- connection->put();
if (byte_throttler)
byte_throttler->put(payload.length() + middle.length() + data.length());
if (msg_throttler)
msg_throttler->put();
}
public:
- Connection *get_connection() { return connection; }
- void set_connection(Connection *c) {
- if (connection)
- connection->put();
+ const ConnectionRef& get_connection() { return connection; }
+ void set_connection(const ConnectionRef& c) {
connection = c;
}
void set_byte_throttler(Throttle *t) { byte_throttler = t; }
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index 13d34611e19..28643e10767 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -408,6 +408,9 @@ public:
* @return 0 on success, or -errno on failure.
*/
virtual int send_message(Message *m, Connection *con) = 0;
+ int send_message(Message *m, const ConnectionRef& con) {
+ return send_message(m, con.get());
+ }
/**
* Lazily queue the given Message for the given entity. Unlike with
* send_message(), lazy_send_message() will not establish a
@@ -450,11 +453,11 @@ public:
*
* @param dest The entity to get a connection for.
*/
- virtual Connection *get_connection(const entity_inst_t& dest) = 0;
+ virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0;
/**
* Get the Connection object associated with ourselves.
*/
- virtual Connection *get_loopback_connection() = 0;
+ virtual ConnectionRef get_loopback_connection() = 0;
/**
* Send a "keepalive" ping to the given dest, if it has a working Connection.
* If the Messenger doesn't already have a Connection, or if the underlying
@@ -494,6 +497,9 @@ public:
* @param con The Connection to mark down.
*/
virtual void mark_down(Connection *con) = 0;
+ void mark_down(const ConnectionRef& con) {
+ mark_down(con.get());
+ }
/**
* Unlike mark_down, this function will try and deliver
* all messages before ending the connection, and it will use
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index 42d461ac2f8..789f7906254 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -72,7 +72,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
connect_seq(0), peer_global_seq(0),
out_seq(0), in_seq(0), in_seq_acked(0) {
if (con) {
- connection_state = con->get();
+ connection_state = con;
connection_state->reset_pipe(this);
} else {
connection_state = new Connection(msgr);
@@ -93,8 +93,6 @@ Pipe::~Pipe()
{
assert(out_q.empty());
assert(sent.empty());
- if (connection_state)
- connection_state->put();
delete session_security;
delete delay_thread;
}
@@ -390,7 +388,7 @@ int Pipe::accept()
// Check the authorizer. If not good, bail out.
- if (!msgr->verify_authorizer(connection_state, peer_type, connect.authorizer_protocol, authorizer,
+ if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer,
authorizer_reply, authorizer_valid, session_key) ||
!authorizer_valid) {
ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
@@ -569,8 +567,7 @@ int Pipe::accept()
// drop my Connection, and take a ref to the existing one. do not
// clear existing->connection_state, since read_message and
// write_message both dereference it without pipe_lock.
- connection_state->put();
- connection_state = existing->connection_state->get();
+ connection_state = existing->connection_state;
// make existing Connection reference us
existing->connection_state->reset_pipe(this);
@@ -622,7 +619,7 @@ int Pipe::accept()
connection_state->get_features());
// notify
- msgr->dispatch_queue.queue_accept(connection_state);
+ msgr->dispatch_queue.queue_accept(connection_state.get());
// ok!
if (msgr->dispatch_queue.stop)
@@ -1041,7 +1038,7 @@ int Pipe::connect()
session_security = NULL;
}
- msgr->dispatch_queue.queue_connect(connection_state);
+ msgr->dispatch_queue.queue_connect(connection_state.get());
if (!reader_running) {
ldout(msgr->cct,20) << "connect starting reader" << dendl;
@@ -1207,7 +1204,7 @@ void Pipe::fault(bool onread)
assert(connection_state);
connection_state->clear_pipe(this);
- msgr->dispatch_queue.queue_reset(connection_state);
+ msgr->dispatch_queue.queue_reset(connection_state.get());
return;
}
@@ -1273,7 +1270,7 @@ void Pipe::was_session_reset()
delay_thread->discard();
discard_out_queue();
- msgr->dispatch_queue.queue_remote_reset(connection_state);
+ msgr->dispatch_queue.queue_remote_reset(connection_state.get());
if (randomize_out_seq()) {
lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
@@ -1383,7 +1380,7 @@ void Pipe::reader()
continue;
}
- m->set_connection(connection_state->get());
+ m->set_connection(connection_state.get());
// note last received message.
in_seq = m->get_seq();
@@ -1516,7 +1513,7 @@ void Pipe::writer()
}
// associate message with Connection (for benefit of encode_payload)
- m->set_connection(connection_state->get());
+ m->set_connection(connection_state.get());
ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h
index e2a155a6038..b359bc2caf7 100644
--- a/src/msg/Pipe.h
+++ b/src/msg/Pipe.h
@@ -150,7 +150,7 @@ class DispatchQueue;
protected:
friend class SimpleMessenger;
- Connection *connection_state;
+ ConnectionRef connection_state;
utime_t backoff; // backoff time
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index c9764fac324..6b1f309ac35 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -66,7 +66,6 @@ SimpleMessenger::~SimpleMessenger()
assert(!did_bind); // either we didn't bind or we shut down the Accepter
assert(rank_pipe.empty()); // we don't have any running Pipes.
assert(reaper_stop && !reaper_started); // the reaper thread is stopped
- local_connection->put();
}
void SimpleMessenger::ready()
@@ -112,7 +111,7 @@ int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest,
lock.Lock();
Pipe *pipe = _lookup_pipe(dest.addr);
- submit_message(m, (pipe ? pipe->connection_state : NULL),
+ submit_message(m, (pipe ? pipe->connection_state.get() : NULL),
dest.addr, dest.name.type(), lazy);
lock.Unlock();
return 0;
@@ -357,12 +356,12 @@ bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type,
return ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid,session_key);
}
-Connection *SimpleMessenger::get_connection(const entity_inst_t& dest)
+ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
{
Mutex::Locker l(lock);
if (my_inst.addr == dest.addr) {
// local
- return static_cast<Connection *>(local_connection->get());
+ return local_connection;
}
// remote
@@ -376,14 +375,14 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest)
}
Mutex::Locker l(pipe->pipe_lock);
if (pipe->connection_state)
- return static_cast<Connection *>(pipe->connection_state->get());
+ return pipe->connection_state;
// we failed too quickly! retry. FIXME.
}
}
-Connection *SimpleMessenger::get_loopback_connection()
+ConnectionRef SimpleMessenger::get_loopback_connection()
{
- return static_cast<Connection*>(local_connection->get());
+ return local_connection;
}
void SimpleMessenger::submit_message(Message *m, Connection *con,
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index 0d54d174965..47ee145aa5e 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -303,8 +303,8 @@ public:
* @param dest The entity to get a connection for.
* @return The requested Connection, as a pointer whose reference you own.
*/
- virtual Connection *get_connection(const entity_inst_t& dest);
- virtual Connection *get_loopback_connection();
+ virtual ConnectionRef get_connection(const entity_inst_t& dest);
+ virtual ConnectionRef get_loopback_connection();
/**
* Send a "keepalive" ping to the given dest, if it has a working Connection.
* If the Messenger doesn't already have a Connection, or if the underlying
@@ -555,7 +555,7 @@ public:
int timeout;
/// con used for sending messages to ourselves
- Connection *local_connection;
+ ConnectionRef local_connection;
/**
* @defgroup SimpleMessenger internals
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index b482865b3d1..a18c4459719 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -2512,11 +2512,9 @@ void OSD::_add_heartbeat_peer(int p)
hi->peer = p;
HeartbeatSession *s = new HeartbeatSession(p);
hi->con_back = cons.first.get();
- hi->con_back->get();
hi->con_back->set_priv(s);
if (cons.second) {
hi->con_front = cons.second.get();
- hi->con_front->get();
hi->con_front->set_priv(s->get());
}
dout(10) << "_add_heartbeat_peer: new peer osd." << p
@@ -2538,10 +2536,8 @@ void OSD::_remove_heartbeat_peer(int n)
<< " " << (q->second.con_front ? q->second.con_front->get_peer_addr() : entity_addr_t())
<< dendl;
hbclient_messenger->mark_down(q->second.con_back);
- q->second.con_back->put();
if (q->second.con_front) {
hbclient_messenger->mark_down(q->second.con_front);
- q->second.con_front->put();
}
heartbeat_peers.erase(q);
}
@@ -2671,10 +2667,8 @@ void OSD::reset_heartbeat_peers()
while (!heartbeat_peers.empty()) {
HeartbeatInfo& hi = heartbeat_peers.begin()->second;
hbclient_messenger->mark_down(hi.con_back);
- hi.con_back->put();
if (hi.con_front) {
hbclient_messenger->mark_down(hi.con_front);
- hi.con_front->put();
}
heartbeat_peers.erase(heartbeat_peers.begin());
}
@@ -2966,22 +2960,18 @@ bool OSD::heartbeat_reset(Connection *con)
<< ", reopening" << dendl;
if (con != p->second.con_back) {
hbclient_messenger->mark_down(p->second.con_back);
- p->second.con_back->put();
}
- p->second.con_back = NULL;
+ p->second.con_back.reset(NULL);
if (p->second.con_front && con != p->second.con_front) {
hbclient_messenger->mark_down(p->second.con_front);
- p->second.con_front->put();
}
- p->second.con_front = NULL;
+ p->second.con_front.reset(NULL);
pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
if (newcon.first) {
p->second.con_back = newcon.first.get();
- p->second.con_back->get();
p->second.con_back->set_priv(s->get());
if (newcon.second) {
p->second.con_front = newcon.second.get();
- p->second.con_front->get();
p->second.con_front->set_priv(s->get());
}
} else {
@@ -3553,10 +3543,7 @@ ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
next_osdmap->get_info(peer).up_from > from_epoch) {
return NULL;
}
- ConnectionRef ret(
- osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer)));
- ret->put(); // Ref from get_connection
- return ret;
+ return osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer));
}
pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
@@ -3572,10 +3559,7 @@ pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t f
return ret;
}
ret.first = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer));
- ret.first->put(); // Ref from get_connection
ret.second = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_front_inst(peer));
- if (ret.second)
- ret.second->put(); // Ref from get_connection
return ret;
}
@@ -3759,7 +3743,7 @@ void OSD::handle_command(MMonCommand *m)
void OSD::handle_command(MCommand *m)
{
- Connection *con = m->get_connection();
+ ConnectionRef con = m->get_connection();
Session *session = static_cast<Session *>(con->get_priv());
if (!session) {
client_messenger->send_message(new MCommandReply(m, -EPERM), con);
@@ -3776,7 +3760,7 @@ void OSD::handle_command(MCommand *m)
return;
}
- Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con);
+ Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con.get());
command_wq.queue(c);
m->put();
@@ -4219,9 +4203,8 @@ bool OSD::heartbeat_dispatch(Message *m)
case CEPH_MSG_OSD_MAP:
{
- Connection *self = cluster_messenger->get_loopback_connection();
+ ConnectionRef self = cluster_messenger->get_loopback_connection();
cluster_messenger->send_message(m, self);
- self->put();
}
break;
@@ -4767,10 +4750,8 @@ void OSD::note_down_osd(int peer)
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer);
if (p != heartbeat_peers.end()) {
hbclient_messenger->mark_down(p->second.con_back);
- p->second.con_back->put();
if (p->second.con_front) {
hbclient_messenger->mark_down(p->second.con_front);
- p->second.con_front->put();
}
heartbeat_peers.erase(p);
}
@@ -6675,7 +6656,7 @@ void OSD::handle_op(OpRequestRef op)
return;
}
// share our map with sender, if they're old
- _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(),
+ _share_map_incoming(m->get_source(), m->get_connection().get(), m->get_map_epoch(),
static_cast<Session *>(m->get_connection()->get_priv()));
if (op->rmw_flags == 0) {
@@ -6803,7 +6784,7 @@ void OSD::handle_sub_op(OpRequestRef op)
return;
// share our map with sender, if they're old
- _share_map_incoming(m->get_source(), m->get_connection(), m->map_epoch,
+ _share_map_incoming(m->get_source(), m->get_connection().get(), m->map_epoch,
static_cast<Session*>(m->get_connection()->get_priv()));
if (service.splitting(pgid)) {
@@ -6840,7 +6821,7 @@ void OSD::handle_sub_op_reply(OpRequestRef op)
if (!require_same_or_newer_map(op, m->get_map_epoch())) return;
// share our map with sender, if they're old
- _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(),
+ _share_map_incoming(m->get_source(), m->get_connection().get(), m->get_map_epoch(),
static_cast<Session*>(m->get_connection()->get_priv()));
PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 0e35250b79a..3d8218e6aa1 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -303,9 +303,15 @@ public:
void send_message_osd_cluster(Message *m, Connection *con) {
cluster_messenger->send_message(m, con);
}
+ void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
+ cluster_messenger->send_message(m, con.get());
+ }
void send_message_osd_client(Message *m, Connection *con) {
client_messenger->send_message(m, con);
}
+ void send_message_osd_client(Message *m, const ConnectionRef& con) {
+ client_messenger->send_message(m, con.get());
+ }
entity_name_t get_cluster_msgr_name() {
return cluster_messenger->get_myname();
}
@@ -688,7 +694,7 @@ public:
OSDCap caps;
int64_t auid;
epoch_t last_sent_epoch;
- Connection *con;
+ ConnectionRef con;
WatchConState wstate;
Session() : auid(-1), last_sent_epoch(0), con(0) {}
@@ -699,8 +705,8 @@ private:
/// information about a heartbeat peer
struct HeartbeatInfo {
int peer; ///< peer
- Connection *con_front; ///< peer connection (front)
- Connection *con_back; ///< peer connection (back)
+ ConnectionRef con_front; ///< peer connection (front)
+ ConnectionRef con_back; ///< peer connection (back)
utime_t first_tx; ///< time we sent our first ping request
utime_t last_tx; ///< last time we sent a ping request
utime_t last_rx_front; ///< last time we got a ping reply on the front side
@@ -1242,17 +1248,10 @@ protected:
vector<string> cmd;
tid_t tid;
bufferlist indata;
- Connection *con;
+ ConnectionRef con;
Command(vector<string>& c, tid_t t, bufferlist& bl, Connection *co)
- : cmd(c), tid(t), indata(bl), con(co) {
- if (con)
- con->get();
- }
- ~Command() {
- if (con)
- con->put();
- }
+ : cmd(c), tid(t), indata(bl), con(co) {}
};
list<Command*> command_queue;
struct CommandWQ : public ThreadPool::WorkQueue<Command> {
@@ -1284,7 +1283,7 @@ protected:
delete c;
return;
}
- osd->do_command(c->con, c->tid, c->cmd, c->indata);
+ osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
osd->osd_lock.Unlock();
delete c;
}
diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc
index aae2159d69f..0ec92e2641d 100644
--- a/src/osdc/Objecter.cc
+++ b/src/osdc/Objecter.cc
@@ -913,7 +913,6 @@ void Objecter::reopen_session(OSDSession *s)
ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now " << inst << dendl;
if (s->con) {
messenger->mark_down(s->con);
- s->con->put();
logger->inc(l_osdc_osd_session_close);
}
s->con = messenger->get_connection(inst);
@@ -926,7 +925,6 @@ void Objecter::close_session(OSDSession *s)
ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
if (s->con) {
messenger->mark_down(s->con);
- s->con->put();
logger->inc(l_osdc_osd_session_close);
}
s->ops.clear();
@@ -1362,8 +1360,6 @@ void Objecter::finish_op(Op *op)
op->session_item.remove_myself();
if (op->budgeted)
put_op_budget(op);
- if (op->con)
- op->con->put();
ops.erase(op->tid);
logger->set(l_osdc_op_active, ops.size());
@@ -1387,11 +1383,10 @@ void Objecter::send_op(Op *op)
if (op->con) {
ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
op->con->revoke_rx_buffer(op->tid);
- op->con->put();
}
if (op->outbl && op->outbl->length()) {
ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << op->session->con << dendl;
- op->con = op->session->con->get();
+ op->con = op->session->con;
op->con->post_rx_buffer(op->tid, *op->outbl);
}
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index b64c8f77ed0..850d974472e 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -738,7 +738,7 @@ public:
vector<int> acting;
bool used_replica;
- Connection *con; // for rx buffer only
+ ConnectionRef con; // for rx buffer only
vector<OSDOp> ops;
@@ -1064,7 +1064,7 @@ public:
xlist<CommandOp*> command_ops;
int osd;
int incarnation;
- Connection *con;
+ ConnectionRef con;
OSDSession(int o) : osd(o), incarnation(0), con(NULL) {}
};