diff options
author | Sage Weil <sage@inktank.com> | 2013-06-09 20:21:49 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-06-13 10:52:18 -0700 |
commit | e96c0ceec7b22b93a21bea70b005687966566c39 (patch) | |
tree | 1da537c1ef2398d0479d4f9b09c851d33b74279e | |
parent | 77db175c9d8b565e302393d785c3f350f1338aef (diff) | |
download | ceph-e96c0ceec7b22b93a21bea70b005687966566c39.tar.gz |
msgr: use ConnectionRef throughout
Make RefCountedObject a private parent of Connection so that users are
forced to use ConnectionRef whenever references are taken.
Many methods can still take a raw Connection* when they are using the
caller's reference but not taking their own; this is cheaper than
twiddling the reference count, and the lifetime is still well defined.
Local variables generally use ConnectionRef, though.
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/client/Client.cc | 17 | ||||
-rw-r--r-- | src/client/MetaSession.cc | 2 | ||||
-rw-r--r-- | src/client/MetaSession.h | 2 | ||||
-rw-r--r-- | src/mds/MDS.h | 6 | ||||
-rw-r--r-- | src/mds/Server.cc | 6 | ||||
-rw-r--r-- | src/mds/SessionMap.h | 2 | ||||
-rw-r--r-- | src/mon/AuthMonitor.cc | 2 | ||||
-rw-r--r-- | src/mon/MonClient.cc | 15 | ||||
-rw-r--r-- | src/mon/MonClient.h | 2 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 19 | ||||
-rw-r--r-- | src/mon/Monitor.h | 6 | ||||
-rw-r--r-- | src/mon/PGMonitor.cc | 2 | ||||
-rw-r--r-- | src/mon/Session.h | 10 | ||||
-rw-r--r-- | src/msg/DispatchQueue.cc | 2 | ||||
-rw-r--r-- | src/msg/Message.h | 21 | ||||
-rw-r--r-- | src/msg/Messenger.h | 10 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 21 | ||||
-rw-r--r-- | src/msg/Pipe.h | 2 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.cc | 13 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 6 | ||||
-rw-r--r-- | src/osd/OSD.cc | 37 | ||||
-rw-r--r-- | src/osd/OSD.h | 25 | ||||
-rw-r--r-- | src/osdc/Objecter.cc | 7 | ||||
-rw-r--r-- | src/osdc/Objecter.h | 4 |
24 files changed, 98 insertions, 141 deletions
diff --git a/src/client/Client.cc b/src/client/Client.cc index 21cf2678607..204dc98d74d 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -776,7 +776,7 @@ void Client::update_dir_dist(Inode *in, DirStat *dst) void Client::insert_readdir_results(MetaRequest *request, MetaSession *session, Inode *diri) { MClientReply *reply = request->reply; - Connection *con = request->reply->get_connection(); + ConnectionRef con = request->reply->get_connection(); uint64_t features = con->get_features(); assert(request->readdir_result.empty()); @@ -918,7 +918,7 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session) return NULL; } - Connection *con = request->reply->get_connection(); + ConnectionRef con = request->reply->get_connection(); uint64_t features = con->get_features(); ldout(cct, 10) << " features 0x" << hex << features << dec << dendl; @@ -1522,7 +1522,7 @@ void Client::handle_client_session(MClientSession *m) int from = m->get_source().num(); ldout(cct, 10) << "handle_client_session " << *m << " from mds." << from << dendl; - MetaSession *session = _get_mds_session(from, m->get_connection()); + MetaSession *session = _get_mds_session(from, m->get_connection().get()); if (!session) { ldout(cct, 10) << " discarding session message from sessionless mds " << m->get_source_inst() << dendl; m->put(); @@ -1639,7 +1639,7 @@ MClientRequest* Client::build_client_request(MetaRequest *request) void Client::handle_client_request_forward(MClientRequestForward *fwd) { int mds = fwd->get_source().num(); - MetaSession *session = _get_mds_session(mds, fwd->get_connection()); + MetaSession *session = _get_mds_session(mds, fwd->get_connection().get()); if (!session) { fwd->put(); return; @@ -1677,7 +1677,7 @@ void Client::handle_client_request_forward(MClientRequestForward *fwd) void Client::handle_client_reply(MClientReply *reply) { int mds_num = reply->get_source().num(); - MetaSession *session = _get_mds_session(mds_num, reply->get_connection()); + MetaSession *session = _get_mds_session(mds_num, reply->get_connection().get()); if (!session) { reply->put(); return; @@ -1879,7 +1879,6 @@ void Client::handle_mds_map(MMDSMap* m) mds_sessions.count(p->first)) { MetaSession *session = mds_sessions[p->first]; session->inst = mdsmap->get_inst(p->first); - session->con->put(); session->con = messenger->get_connection(session->inst); send_reconnect(session); } @@ -2023,7 +2022,7 @@ void Client::handle_lease(MClientLease *m) assert(m->get_action() == CEPH_MDS_LEASE_REVOKE); int mds = m->get_source().num(); - MetaSession *session = _get_mds_session(mds, m->get_connection()); + MetaSession *session = _get_mds_session(mds, m->get_connection().get()); if (!session) { m->put(); return; @@ -3218,7 +3217,7 @@ void Client::handle_snap(MClientSnap *m) { ldout(cct, 10) << "handle_snap " << *m << dendl; int mds = m->get_source().num(); - MetaSession *session = _get_mds_session(mds, m->get_connection()); + MetaSession *session = _get_mds_session(mds, m->get_connection().get()); if (!session) { m->put(); return; @@ -3294,7 +3293,7 @@ void Client::handle_snap(MClientSnap *m) void Client::handle_caps(MClientCaps *m) { int mds = m->get_source().num(); - MetaSession *session = _get_mds_session(mds, m->get_connection()); + MetaSession *session = _get_mds_session(mds, m->get_connection().get()); if (!session) { m->put(); return; diff --git a/src/client/MetaSession.cc b/src/client/MetaSession.cc index 6c1685217d2..ee38db36d97 100644 --- a/src/client/MetaSession.cc +++ b/src/client/MetaSession.cc @@ -36,6 +36,4 @@ MetaSession::~MetaSession() { if (release) release->put(); - if (con) - con->put(); } diff --git a/src/client/MetaSession.h b/src/client/MetaSession.h index 0cff7586c36..1f6a1240617 100644 --- a/src/client/MetaSession.h +++ b/src/client/MetaSession.h @@ -19,7 +19,7 @@ class MClientCapRelease; struct MetaSession { int mds_num; - Connection *con; + ConnectionRef con; version_t seq; uint64_t cap_gen; utime_t cap_ttl, last_cap_renew_request; diff --git a/src/mds/MDS.h b/src/mds/MDS.h index 4e69dcaf8f9..9e3e2dae9c3 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -346,8 +346,14 @@ class MDS : public Dispatcher { void send_message_client_counted(Message *m, client_t client); void send_message_client_counted(Message *m, Session *session); void send_message_client_counted(Message *m, Connection *connection); + void send_message_client_counted(Message *m, const ConnectionRef& con) { + send_message_client_counted(m, con.get()); + } void send_message_client(Message *m, Session *session); void send_message(Message *m, Connection *c); + void send_message(Message *m, const ConnectionRef& c) { + send_message(m, c.get()); + } // start up, shutdown int init(int wanted_state=MDSMap::STATE_BOOT); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index abeea2c12d5..253c56d7a37 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -306,7 +306,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve // ms_handle_remote_reset() and realize they had in fact closed. // do this *before* sending the message to avoid a possible // race. - mds->messenger->mark_disposable(session->connection); + mds->messenger->mark_disposable(session->connection.get()); // reset session mds->send_message_client(new MClientSession(CEPH_SESSION_CLOSE), session); @@ -901,8 +901,7 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei, } // note client connection to direct my reply - Connection *client_con = req->get_connection(); - client_con->get(); + ConnectionRef client_con = req->get_connection(); // drop non-rdlocks before replying, so that we can issue leases mdcache->request_drop_non_rdlocks(mdr); @@ -929,7 +928,6 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei, reply->set_mdsmap_epoch(mds->mdsmap->get_epoch()); messenger->send_message(reply, client_con); } - client_con->put(); // clean up request mdcache->request_finish(mdr); diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index 905a99b81c1..f3e7e0990e1 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -84,7 +84,7 @@ private: public: session_info_t info; ///< durable bits - Connection *connection; + ConnectionRef connection; xlist<Session*>::item item_session_list; list<Message*> preopen_out_queue; ///< messages for client, queued before they connect diff --git a/src/mon/AuthMonitor.cc b/src/mon/AuthMonitor.cc index e4e0dacd8ba..ec639c6d783 100644 --- a/src/mon/AuthMonitor.cc +++ b/src/mon/AuthMonitor.cc @@ -479,7 +479,7 @@ bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable) // always send the latest monmap. if (m->monmap_epoch < mon->monmap->get_epoch()) - mon->send_latest_monmap(m->get_connection()); + mon->send_latest_monmap(m->get_connection().get()); proto = s->auth_handler->start_session(entity_name, indata, response_bl, caps_info); ret = 0; diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index b52ce603158..d7003c13520 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -139,7 +139,6 @@ int MonClient::get_monmap_privately() if (monmap.fsid.is_zero()) { messenger->mark_down(cur_con); // nope, clean that connection up - cur_con->put(); } } @@ -156,10 +155,7 @@ int MonClient::get_monmap_privately() hunting = true; // reset this to true! cur_mon.clear(); - if (cur_con) { - cur_con->put(); - cur_con = NULL; - } + cur_con.reset(NULL); if (!monmap.fsid.is_zero()) return 0; @@ -333,10 +329,8 @@ void MonClient::shutdown() } monc_lock.Lock(); timer.shutdown(); - if (cur_con) { - cur_con->put(); - cur_con = NULL; - } + + cur_con.reset(NULL); monc_lock.Unlock(); } @@ -495,7 +489,6 @@ void MonClient::_pick_new_mon() if (cur_con) { messenger->mark_down(cur_con); - cur_con->put(); } cur_con = messenger->get_connection(monmap.get_inst(cur_mon)); @@ -591,7 +584,7 @@ void MonClient::tick() if (now > sub_renew_after) _renew_subs(); - messenger->send_keepalive(cur_con); + messenger->send_keepalive(cur_con.get()); if (state == MC_STATE_HAVE_SESSION) { send_log(); diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index 04e6b0e44b5..b5333e1714a 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -63,7 +63,7 @@ private: Messenger *messenger; string cur_mon; - Connection *cur_con; + ConnectionRef cur_con; SimpleRNG rng; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 5a3d6fbb7b4..986d8633cc3 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -211,8 +211,6 @@ Monitor::~Monitor() delete paxos; assert(session_map.sessions.empty()); delete mon_caps; - if (con_self) - con_self->put(); } @@ -2912,7 +2910,6 @@ void Monitor::forward_request_leader(PaxosServiceMessage *req) rr->tid = ++routed_request_tid; rr->client_inst = req->get_source_inst(); rr->con = req->get_connection(); - rr->con->get(); encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features rr->session = static_cast<MonSession *>(session->get()); routed_requests[rr->tid] = rr; @@ -2951,7 +2948,7 @@ void Monitor::handle_forward(MForward *m) s->caps = m->client_caps; dout(10) << " caps are " << s->caps << dendl; - s->proxy_con = m->get_connection()->get(); + s->proxy_con = m->get_connection(); s->proxy_tid = m->tid; PaxosServiceMessage *req = m->msg; @@ -2972,8 +2969,7 @@ void Monitor::handle_forward(MForward *m) or the Session. And due to the special nature of this message, nobody refers to the Connection via the Session. So, clear out that half of the ref loop.*/ - s->con->put(); - s->con = NULL; + s->con.reset(NULL); dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl; @@ -3105,7 +3101,6 @@ void Monitor::resend_routed_requests() if (mon == rank) { dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl; req->set_connection(rr->con); - rr->con->get(); retry.push_back(new C_RetryMessage(this, req)); delete rr; } else { @@ -3172,7 +3167,7 @@ void Monitor::waitlist_or_zap_client(Message *m) * 3) command messages. We want to accept these under all possible * circumstances. */ - Connection *con = m->get_connection(); + ConnectionRef con = m->get_connection(); utime_t too_old = ceph_clock_now(g_ceph_context); too_old -= g_ceph_context->_conf->mon_lease; if (m->get_recv_stamp() > too_old && @@ -3195,7 +3190,7 @@ bool Monitor::_ms_dispatch(Message *m) return true; } - Connection *connection = m->get_connection(); + ConnectionRef connection = m->get_connection(); MonSession *s = NULL; MonCap caps; EntityName entity_name; @@ -3219,7 +3214,7 @@ bool Monitor::_ms_dispatch(Message *m) return true; } dout(10) << "do not have session, making new one" << dendl; - s = session_map.new_session(m->get_source_inst(), m->get_connection()); + s = session_map.new_session(m->get_source_inst(), m->get_connection().get()); m->get_connection()->set_priv(s->get()); dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl; @@ -3921,7 +3916,7 @@ void Monitor::check_sub(Subscription *sub) { dout(10) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl; if (sub->next <= monmap->get_epoch()) { - send_latest_monmap(sub->session->con); + send_latest_monmap(sub->session->con.get()); if (sub->onetime) session_map.remove_sub(sub); else @@ -3942,7 +3937,7 @@ void Monitor::send_latest_monmap(Connection *con) void Monitor::handle_mon_get_map(MMonGetMap *m) { dout(10) << "handle_mon_get_map" << dendl; - send_latest_monmap(m->get_connection()); + send_latest_monmap(m->get_connection().get()); m->put(); } diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index e860f8ea375..9e94fa90aaa 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -109,7 +109,7 @@ public: string name; int rank; Messenger *messenger; - Connection *con_self; + ConnectionRef con_self; Mutex lock; SafeTimer timer; @@ -1312,14 +1312,12 @@ public: uint64_t tid; bufferlist request_bl; MonSession *session; - Connection *con; + ConnectionRef con; entity_inst_t client_inst; ~RoutedRequest() { if (session) session->put(); - if (con) - con->put(); } }; uint64_t routed_request_tid; diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index 58b1fc7bd52..1178fcc381b 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -1604,6 +1604,6 @@ void PGMonitor::check_sub(Subscription *sub) { if (sub->type == "osd_pg_creates") { send_pg_creates(sub->session->inst.name.num(), - sub->session->con); + sub->session->con.get()); } } diff --git a/src/mon/Session.h b/src/mon/Session.h index c8a7f3c4595..7df6001581c 100644 --- a/src/mon/Session.h +++ b/src/mon/Session.h @@ -38,7 +38,7 @@ struct Subscription { }; struct MonSession : public RefCountedObject { - Connection *con; + ConnectionRef con; entity_inst_t inst; utime_t until; utime_t time_established; @@ -54,21 +54,17 @@ struct MonSession : public RefCountedObject { AuthServiceHandler *auth_handler; - Connection *proxy_con; + ConnectionRef proxy_con; uint64_t proxy_tid; MonSession(const entity_inst_t& i, Connection *c) : - con(c->get()), inst(i), closed(false), item(this), + con(c), inst(i), closed(false), item(this), auid(0), global_id(0), notified_global_id(0), auth_handler(NULL), proxy_con(NULL), proxy_tid(0) { time_established = ceph_clock_now(g_ceph_context); } ~MonSession() { - if (con) - con->put(); - if (proxy_con) - proxy_con->put(); //generic_dout(0) << "~MonSession " << this << dendl; // we should have been removed before we get destructed; see MonSessionMap::remove_session() assert(!item.is_on_list()); diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index 67bb52e6c7b..a92c357beb5 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -54,7 +54,7 @@ void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) void DispatchQueue::local_delivery(Message *m, int priority) { Mutex::Locker l(lock); - m->set_connection(msgr->local_connection->get()); + m->set_connection(msgr->local_connection.get()); add_arrival(m); if (priority >= CEPH_MSG_PRIO_LOW) { mqueue.enqueue_strict( diff --git a/src/msg/Message.h b/src/msg/Message.h index aca91184141..0089751c7ed 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -161,7 +161,7 @@ class Messenger; -struct Connection : public RefCountedObject { +struct Connection : private RefCountedObject { Mutex lock; Messenger *msgr; RefCountedObject *priv; @@ -174,6 +174,8 @@ struct Connection : public RefCountedObject { int rx_buffers_version; map<tid_t,pair<bufferlist,int> > rx_buffers; + friend class boost::intrusive_ptr<Connection>; + public: Connection(Messenger *m) : lock("Connection::lock"), @@ -184,6 +186,9 @@ public: pipe(NULL), failed(false), rx_buffers_version(0) { + // we are managed exlusively by ConnectionRef; make it so you can + // ConnectionRef foo = new Connection; + nref.set(0); } ~Connection() { //generic_dout(0) << "~Connection " << this << dendl; @@ -195,10 +200,6 @@ public: pipe->put(); } - Connection *get() { - return static_cast<Connection *>(RefCountedObject::get()); - } - void set_priv(RefCountedObject *o) { Mutex::Locker l(lock); if (priv) @@ -304,7 +305,7 @@ protected: /* time at which message was fully read */ utime_t recv_complete_stamp; - Connection *connection; + ConnectionRef connection; // release our size in bytes back to this throttler when our payload // is adjusted or when we are destroyed. @@ -352,18 +353,14 @@ public: protected: virtual ~Message() { assert(nref.read() == 0); - if (connection) - connection->put(); if (byte_throttler) byte_throttler->put(payload.length() + middle.length() + data.length()); if (msg_throttler) msg_throttler->put(); } public: - Connection *get_connection() { return connection; } - void set_connection(Connection *c) { - if (connection) - connection->put(); + const ConnectionRef& get_connection() { return connection; } + void set_connection(const ConnectionRef& c) { connection = c; } void set_byte_throttler(Throttle *t) { byte_throttler = t; } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 13d34611e19..28643e10767 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -408,6 +408,9 @@ public: * @return 0 on success, or -errno on failure. */ virtual int send_message(Message *m, Connection *con) = 0; + int send_message(Message *m, const ConnectionRef& con) { + return send_message(m, con.get()); + } /** * Lazily queue the given Message for the given entity. Unlike with * send_message(), lazy_send_message() will not establish a @@ -450,11 +453,11 @@ public: * * @param dest The entity to get a connection for. */ - virtual Connection *get_connection(const entity_inst_t& dest) = 0; + virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0; /** * Get the Connection object associated with ourselves. */ - virtual Connection *get_loopback_connection() = 0; + virtual ConnectionRef get_loopback_connection() = 0; /** * Send a "keepalive" ping to the given dest, if it has a working Connection. * If the Messenger doesn't already have a Connection, or if the underlying @@ -494,6 +497,9 @@ public: * @param con The Connection to mark down. */ virtual void mark_down(Connection *con) = 0; + void mark_down(const ConnectionRef& con) { + mark_down(con.get()); + } /** * Unlike mark_down, this function will try and deliver * all messages before ending the connection, and it will use diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 42d461ac2f8..789f7906254 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -72,7 +72,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0) { if (con) { - connection_state = con->get(); + connection_state = con; connection_state->reset_pipe(this); } else { connection_state = new Connection(msgr); @@ -93,8 +93,6 @@ Pipe::~Pipe() { assert(out_q.empty()); assert(sent.empty()); - if (connection_state) - connection_state->put(); delete session_security; delete delay_thread; } @@ -390,7 +388,7 @@ int Pipe::accept() // Check the authorizer. If not good, bail out. - if (!msgr->verify_authorizer(connection_state, peer_type, connect.authorizer_protocol, authorizer, + if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer, authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl; @@ -569,8 +567,7 @@ int Pipe::accept() // drop my Connection, and take a ref to the existing one. do not // clear existing->connection_state, since read_message and // write_message both dereference it without pipe_lock. - connection_state->put(); - connection_state = existing->connection_state->get(); + connection_state = existing->connection_state; // make existing Connection reference us existing->connection_state->reset_pipe(this); @@ -622,7 +619,7 @@ int Pipe::accept() connection_state->get_features()); // notify - msgr->dispatch_queue.queue_accept(connection_state); + msgr->dispatch_queue.queue_accept(connection_state.get()); // ok! if (msgr->dispatch_queue.stop) @@ -1041,7 +1038,7 @@ int Pipe::connect() session_security = NULL; } - msgr->dispatch_queue.queue_connect(connection_state); + msgr->dispatch_queue.queue_connect(connection_state.get()); if (!reader_running) { ldout(msgr->cct,20) << "connect starting reader" << dendl; @@ -1207,7 +1204,7 @@ void Pipe::fault(bool onread) assert(connection_state); connection_state->clear_pipe(this); - msgr->dispatch_queue.queue_reset(connection_state); + msgr->dispatch_queue.queue_reset(connection_state.get()); return; } @@ -1273,7 +1270,7 @@ void Pipe::was_session_reset() delay_thread->discard(); discard_out_queue(); - msgr->dispatch_queue.queue_remote_reset(connection_state); + msgr->dispatch_queue.queue_remote_reset(connection_state.get()); if (randomize_out_seq()) { lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; @@ -1383,7 +1380,7 @@ void Pipe::reader() continue; } - m->set_connection(connection_state->get()); + m->set_connection(connection_state.get()); // note last received message. in_seq = m->get_seq(); @@ -1516,7 +1513,7 @@ void Pipe::writer() } // associate message with Connection (for benefit of encode_payload) - m->set_connection(connection_state->get()); + m->set_connection(connection_state.get()); ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index e2a155a6038..b359bc2caf7 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -150,7 +150,7 @@ class DispatchQueue; protected: friend class SimpleMessenger; - Connection *connection_state; + ConnectionRef connection_state; utime_t backoff; // backoff time diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index c9764fac324..6b1f309ac35 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -66,7 +66,6 @@ SimpleMessenger::~SimpleMessenger() assert(!did_bind); // either we didn't bind or we shut down the Accepter assert(rank_pipe.empty()); // we don't have any running Pipes. assert(reaper_stop && !reaper_started); // the reaper thread is stopped - local_connection->put(); } void SimpleMessenger::ready() @@ -112,7 +111,7 @@ int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest, lock.Lock(); Pipe *pipe = _lookup_pipe(dest.addr); - submit_message(m, (pipe ? pipe->connection_state : NULL), + submit_message(m, (pipe ? pipe->connection_state.get() : NULL), dest.addr, dest.name.type(), lazy); lock.Unlock(); return 0; @@ -357,12 +356,12 @@ bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type, return ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid,session_key); } -Connection *SimpleMessenger::get_connection(const entity_inst_t& dest) +ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest) { Mutex::Locker l(lock); if (my_inst.addr == dest.addr) { // local - return static_cast<Connection *>(local_connection->get()); + return local_connection; } // remote @@ -376,14 +375,14 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest) } Mutex::Locker l(pipe->pipe_lock); if (pipe->connection_state) - return static_cast<Connection *>(pipe->connection_state->get()); + return pipe->connection_state; // we failed too quickly! retry. FIXME. } } -Connection *SimpleMessenger::get_loopback_connection() +ConnectionRef SimpleMessenger::get_loopback_connection() { - return static_cast<Connection*>(local_connection->get()); + return local_connection; } void SimpleMessenger::submit_message(Message *m, Connection *con, diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 0d54d174965..47ee145aa5e 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -303,8 +303,8 @@ public: * @param dest The entity to get a connection for. * @return The requested Connection, as a pointer whose reference you own. */ - virtual Connection *get_connection(const entity_inst_t& dest); - virtual Connection *get_loopback_connection(); + virtual ConnectionRef get_connection(const entity_inst_t& dest); + virtual ConnectionRef get_loopback_connection(); /** * Send a "keepalive" ping to the given dest, if it has a working Connection. * If the Messenger doesn't already have a Connection, or if the underlying @@ -555,7 +555,7 @@ public: int timeout; /// con used for sending messages to ourselves - Connection *local_connection; + ConnectionRef local_connection; /** * @defgroup SimpleMessenger internals diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index b482865b3d1..a18c4459719 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -2512,11 +2512,9 @@ void OSD::_add_heartbeat_peer(int p) hi->peer = p; HeartbeatSession *s = new HeartbeatSession(p); hi->con_back = cons.first.get(); - hi->con_back->get(); hi->con_back->set_priv(s); if (cons.second) { hi->con_front = cons.second.get(); - hi->con_front->get(); hi->con_front->set_priv(s->get()); } dout(10) << "_add_heartbeat_peer: new peer osd." << p @@ -2538,10 +2536,8 @@ void OSD::_remove_heartbeat_peer(int n) << " " << (q->second.con_front ? q->second.con_front->get_peer_addr() : entity_addr_t()) << dendl; hbclient_messenger->mark_down(q->second.con_back); - q->second.con_back->put(); if (q->second.con_front) { hbclient_messenger->mark_down(q->second.con_front); - q->second.con_front->put(); } heartbeat_peers.erase(q); } @@ -2671,10 +2667,8 @@ void OSD::reset_heartbeat_peers() while (!heartbeat_peers.empty()) { HeartbeatInfo& hi = heartbeat_peers.begin()->second; hbclient_messenger->mark_down(hi.con_back); - hi.con_back->put(); if (hi.con_front) { hbclient_messenger->mark_down(hi.con_front); - hi.con_front->put(); } heartbeat_peers.erase(heartbeat_peers.begin()); } @@ -2966,22 +2960,18 @@ bool OSD::heartbeat_reset(Connection *con) << ", reopening" << dendl; if (con != p->second.con_back) { hbclient_messenger->mark_down(p->second.con_back); - p->second.con_back->put(); } - p->second.con_back = NULL; + p->second.con_back.reset(NULL); if (p->second.con_front && con != p->second.con_front) { hbclient_messenger->mark_down(p->second.con_front); - p->second.con_front->put(); } - p->second.con_front = NULL; + p->second.con_front.reset(NULL); pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch); if (newcon.first) { p->second.con_back = newcon.first.get(); - p->second.con_back->get(); p->second.con_back->set_priv(s->get()); if (newcon.second) { p->second.con_front = newcon.second.get(); - p->second.con_front->get(); p->second.con_front->set_priv(s->get()); } } else { @@ -3553,10 +3543,7 @@ ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) next_osdmap->get_info(peer).up_from > from_epoch) { return NULL; } - ConnectionRef ret( - osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer))); - ret->put(); // Ref from get_connection - return ret; + return osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer)); } pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) @@ -3572,10 +3559,7 @@ pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t f return ret; } ret.first = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer)); - ret.first->put(); // Ref from get_connection ret.second = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_front_inst(peer)); - if (ret.second) - ret.second->put(); // Ref from get_connection return ret; } @@ -3759,7 +3743,7 @@ void OSD::handle_command(MMonCommand *m) void OSD::handle_command(MCommand *m) { - Connection *con = m->get_connection(); + ConnectionRef con = m->get_connection(); Session *session = static_cast<Session *>(con->get_priv()); if (!session) { client_messenger->send_message(new MCommandReply(m, -EPERM), con); @@ -3776,7 +3760,7 @@ void OSD::handle_command(MCommand *m) return; } - Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con); + Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con.get()); command_wq.queue(c); m->put(); @@ -4219,9 +4203,8 @@ bool OSD::heartbeat_dispatch(Message *m) case CEPH_MSG_OSD_MAP: { - Connection *self = cluster_messenger->get_loopback_connection(); + ConnectionRef self = cluster_messenger->get_loopback_connection(); cluster_messenger->send_message(m, self); - self->put(); } break; @@ -4767,10 +4750,8 @@ void OSD::note_down_osd(int peer) map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer); if (p != heartbeat_peers.end()) { hbclient_messenger->mark_down(p->second.con_back); - p->second.con_back->put(); if (p->second.con_front) { hbclient_messenger->mark_down(p->second.con_front); - p->second.con_front->put(); } heartbeat_peers.erase(p); } @@ -6675,7 +6656,7 @@ void OSD::handle_op(OpRequestRef op) return; } // share our map with sender, if they're old - _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(), + _share_map_incoming(m->get_source(), m->get_connection().get(), m->get_map_epoch(), static_cast<Session *>(m->get_connection()->get_priv())); if (op->rmw_flags == 0) { @@ -6803,7 +6784,7 @@ void OSD::handle_sub_op(OpRequestRef op) return; // share our map with sender, if they're old - _share_map_incoming(m->get_source(), m->get_connection(), m->map_epoch, + _share_map_incoming(m->get_source(), m->get_connection().get(), m->map_epoch, static_cast<Session*>(m->get_connection()->get_priv())); if (service.splitting(pgid)) { @@ -6840,7 +6821,7 @@ void OSD::handle_sub_op_reply(OpRequestRef op) if (!require_same_or_newer_map(op, m->get_map_epoch())) return; // share our map with sender, if they're old - _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(), + _share_map_incoming(m->get_source(), m->get_connection().get(), m->get_map_epoch(), static_cast<Session*>(m->get_connection()->get_priv())); PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 0e35250b79a..3d8218e6aa1 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -303,9 +303,15 @@ public: void send_message_osd_cluster(Message *m, Connection *con) { cluster_messenger->send_message(m, con); } + void send_message_osd_cluster(Message *m, const ConnectionRef& con) { + cluster_messenger->send_message(m, con.get()); + } void send_message_osd_client(Message *m, Connection *con) { client_messenger->send_message(m, con); } + void send_message_osd_client(Message *m, const ConnectionRef& con) { + client_messenger->send_message(m, con.get()); + } entity_name_t get_cluster_msgr_name() { return cluster_messenger->get_myname(); } @@ -688,7 +694,7 @@ public: OSDCap caps; int64_t auid; epoch_t last_sent_epoch; - Connection *con; + ConnectionRef con; WatchConState wstate; Session() : auid(-1), last_sent_epoch(0), con(0) {} @@ -699,8 +705,8 @@ private: /// information about a heartbeat peer struct HeartbeatInfo { int peer; ///< peer - Connection *con_front; ///< peer connection (front) - Connection *con_back; ///< peer connection (back) + ConnectionRef con_front; ///< peer connection (front) + ConnectionRef con_back; ///< peer connection (back) utime_t first_tx; ///< time we sent our first ping request utime_t last_tx; ///< last time we sent a ping request utime_t last_rx_front; ///< last time we got a ping reply on the front side @@ -1242,17 +1248,10 @@ protected: vector<string> cmd; tid_t tid; bufferlist indata; - Connection *con; + ConnectionRef con; Command(vector<string>& c, tid_t t, bufferlist& bl, Connection *co) - : cmd(c), tid(t), indata(bl), con(co) { - if (con) - con->get(); - } - ~Command() { - if (con) - con->put(); - } + : cmd(c), tid(t), indata(bl), con(co) {} }; list<Command*> command_queue; struct CommandWQ : public ThreadPool::WorkQueue<Command> { @@ -1284,7 +1283,7 @@ protected: delete c; return; } - osd->do_command(c->con, c->tid, c->cmd, c->indata); + osd->do_command(c->con.get(), c->tid, c->cmd, c->indata); osd->osd_lock.Unlock(); delete c; } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index aae2159d69f..0ec92e2641d 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -913,7 +913,6 @@ void Objecter::reopen_session(OSDSession *s) ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now " << inst << dendl; if (s->con) { messenger->mark_down(s->con); - s->con->put(); logger->inc(l_osdc_osd_session_close); } s->con = messenger->get_connection(inst); @@ -926,7 +925,6 @@ void Objecter::close_session(OSDSession *s) ldout(cct, 10) << "close_session for osd." << s->osd << dendl; if (s->con) { messenger->mark_down(s->con); - s->con->put(); logger->inc(l_osdc_osd_session_close); } s->ops.clear(); @@ -1362,8 +1360,6 @@ void Objecter::finish_op(Op *op) op->session_item.remove_myself(); if (op->budgeted) put_op_budget(op); - if (op->con) - op->con->put(); ops.erase(op->tid); logger->set(l_osdc_op_active, ops.size()); @@ -1387,11 +1383,10 @@ void Objecter::send_op(Op *op) if (op->con) { ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl; op->con->revoke_rx_buffer(op->tid); - op->con->put(); } if (op->outbl && op->outbl->length()) { ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << op->session->con << dendl; - op->con = op->session->con->get(); + op->con = op->session->con; op->con->post_rx_buffer(op->tid, *op->outbl); } diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index b64c8f77ed0..850d974472e 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -738,7 +738,7 @@ public: vector<int> acting; bool used_replica; - Connection *con; // for rx buffer only + ConnectionRef con; // for rx buffer only vector<OSDOp> ops; @@ -1064,7 +1064,7 @@ public: xlist<CommandOp*> command_ops; int osd; int incarnation; - Connection *con; + ConnectionRef con; OSDSession(int o) : osd(o), incarnation(0), con(NULL) {} }; |