diff options
author | Sage Weil <sage@inktank.com> | 2013-06-13 16:21:21 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-06-13 16:21:21 -0700 |
commit | 763432a3ccfee7fdbc4686356164dab1680e354d (patch) | |
tree | b3e65058f25dad29889efd98fe7d7d9a2f95bd7f | |
parent | 392e86fbfffb95a0cf492f28756d71f29ed0d851 (diff) | |
parent | 0193f88519869e41362e1aef1dfd0673f7579144 (diff) | |
download | ceph-763432a3ccfee7fdbc4686356164dab1680e354d.tar.gz |
Merge pull request #356 from ceph/wip-leaks
Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/client/Client.cc | 17 | ||||
-rw-r--r-- | src/client/MetaSession.cc | 2 | ||||
-rw-r--r-- | src/client/MetaSession.h | 2 | ||||
-rw-r--r-- | src/mds/MDS.h | 6 | ||||
-rw-r--r-- | src/mds/Server.cc | 6 | ||||
-rw-r--r-- | src/mds/SessionMap.h | 2 | ||||
-rw-r--r-- | src/messages/MMonSync.h | 4 | ||||
-rw-r--r-- | src/mon/AuthMonitor.cc | 2 | ||||
-rw-r--r-- | src/mon/MonClient.cc | 27 | ||||
-rw-r--r-- | src/mon/MonClient.h | 2 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 28 | ||||
-rw-r--r-- | src/mon/Monitor.h | 7 | ||||
-rw-r--r-- | src/mon/PGMonitor.cc | 2 | ||||
-rw-r--r-- | src/mon/PaxosService.cc | 5 | ||||
-rw-r--r-- | src/mon/Session.h | 10 | ||||
-rw-r--r-- | src/msg/DispatchQueue.cc | 52 | ||||
-rw-r--r-- | src/msg/Message.h | 25 | ||||
-rw-r--r-- | src/msg/Messenger.h | 10 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 30 | ||||
-rw-r--r-- | src/msg/Pipe.h | 2 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.cc | 18 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 6 | ||||
-rw-r--r-- | src/osd/ClassHandler.cc | 7 | ||||
-rw-r--r-- | src/osd/ClassHandler.h | 3 | ||||
-rw-r--r-- | src/osd/OSD.cc | 41 | ||||
-rw-r--r-- | src/osd/OSD.h | 25 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 1 | ||||
-rw-r--r-- | src/osdc/Objecter.cc | 18 | ||||
-rw-r--r-- | src/osdc/Objecter.h | 4 |
29 files changed, 185 insertions, 179 deletions
diff --git a/src/client/Client.cc b/src/client/Client.cc index 21cf2678607..204dc98d74d 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -776,7 +776,7 @@ void Client::update_dir_dist(Inode *in, DirStat *dst) void Client::insert_readdir_results(MetaRequest *request, MetaSession *session, Inode *diri) { MClientReply *reply = request->reply; - Connection *con = request->reply->get_connection(); + ConnectionRef con = request->reply->get_connection(); uint64_t features = con->get_features(); assert(request->readdir_result.empty()); @@ -918,7 +918,7 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session) return NULL; } - Connection *con = request->reply->get_connection(); + ConnectionRef con = request->reply->get_connection(); uint64_t features = con->get_features(); ldout(cct, 10) << " features 0x" << hex << features << dec << dendl; @@ -1522,7 +1522,7 @@ void Client::handle_client_session(MClientSession *m) int from = m->get_source().num(); ldout(cct, 10) << "handle_client_session " << *m << " from mds." << from << dendl; - MetaSession *session = _get_mds_session(from, m->get_connection()); + MetaSession *session = _get_mds_session(from, m->get_connection().get()); if (!session) { ldout(cct, 10) << " discarding session message from sessionless mds " << m->get_source_inst() << dendl; m->put(); @@ -1639,7 +1639,7 @@ MClientRequest* Client::build_client_request(MetaRequest *request) void Client::handle_client_request_forward(MClientRequestForward *fwd) { int mds = fwd->get_source().num(); - MetaSession *session = _get_mds_session(mds, fwd->get_connection()); + MetaSession *session = _get_mds_session(mds, fwd->get_connection().get()); if (!session) { fwd->put(); return; @@ -1677,7 +1677,7 @@ void Client::handle_client_request_forward(MClientRequestForward *fwd) void Client::handle_client_reply(MClientReply *reply) { int mds_num = reply->get_source().num(); - MetaSession *session = _get_mds_session(mds_num, reply->get_connection()); + MetaSession *session = _get_mds_session(mds_num, reply->get_connection().get()); if (!session) { reply->put(); return; @@ -1879,7 +1879,6 @@ void Client::handle_mds_map(MMDSMap* m) mds_sessions.count(p->first)) { MetaSession *session = mds_sessions[p->first]; session->inst = mdsmap->get_inst(p->first); - session->con->put(); session->con = messenger->get_connection(session->inst); send_reconnect(session); } @@ -2023,7 +2022,7 @@ void Client::handle_lease(MClientLease *m) assert(m->get_action() == CEPH_MDS_LEASE_REVOKE); int mds = m->get_source().num(); - MetaSession *session = _get_mds_session(mds, m->get_connection()); + MetaSession *session = _get_mds_session(mds, m->get_connection().get()); if (!session) { m->put(); return; @@ -3218,7 +3217,7 @@ void Client::handle_snap(MClientSnap *m) { ldout(cct, 10) << "handle_snap " << *m << dendl; int mds = m->get_source().num(); - MetaSession *session = _get_mds_session(mds, m->get_connection()); + MetaSession *session = _get_mds_session(mds, m->get_connection().get()); if (!session) { m->put(); return; @@ -3294,7 +3293,7 @@ void Client::handle_snap(MClientSnap *m) void Client::handle_caps(MClientCaps *m) { int mds = m->get_source().num(); - MetaSession *session = _get_mds_session(mds, m->get_connection()); + MetaSession *session = _get_mds_session(mds, m->get_connection().get()); if (!session) { m->put(); return; diff --git a/src/client/MetaSession.cc b/src/client/MetaSession.cc index 6c1685217d2..ee38db36d97 100644 --- a/src/client/MetaSession.cc +++ b/src/client/MetaSession.cc @@ -36,6 +36,4 @@ MetaSession::~MetaSession() { if (release) release->put(); - if (con) - con->put(); } diff --git a/src/client/MetaSession.h b/src/client/MetaSession.h index 0cff7586c36..1f6a1240617 100644 --- a/src/client/MetaSession.h +++ b/src/client/MetaSession.h @@ -19,7 +19,7 @@ class MClientCapRelease; struct MetaSession { int mds_num; - Connection *con; + ConnectionRef con; version_t seq; uint64_t cap_gen; utime_t cap_ttl, last_cap_renew_request; diff --git a/src/mds/MDS.h b/src/mds/MDS.h index 4e69dcaf8f9..9e3e2dae9c3 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -346,8 +346,14 @@ class MDS : public Dispatcher { void send_message_client_counted(Message *m, client_t client); void send_message_client_counted(Message *m, Session *session); void send_message_client_counted(Message *m, Connection *connection); + void send_message_client_counted(Message *m, const ConnectionRef& con) { + send_message_client_counted(m, con.get()); + } void send_message_client(Message *m, Session *session); void send_message(Message *m, Connection *c); + void send_message(Message *m, const ConnectionRef& c) { + send_message(m, c.get()); + } // start up, shutdown int init(int wanted_state=MDSMap::STATE_BOOT); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index abeea2c12d5..253c56d7a37 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -306,7 +306,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve // ms_handle_remote_reset() and realize they had in fact closed. // do this *before* sending the message to avoid a possible // race. - mds->messenger->mark_disposable(session->connection); + mds->messenger->mark_disposable(session->connection.get()); // reset session mds->send_message_client(new MClientSession(CEPH_SESSION_CLOSE), session); @@ -901,8 +901,7 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei, } // note client connection to direct my reply - Connection *client_con = req->get_connection(); - client_con->get(); + ConnectionRef client_con = req->get_connection(); // drop non-rdlocks before replying, so that we can issue leases mdcache->request_drop_non_rdlocks(mdr); @@ -929,7 +928,6 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei, reply->set_mdsmap_epoch(mds->mdsmap->get_epoch()); messenger->send_message(reply, client_con); } - client_con->put(); // clean up request mdcache->request_finish(mdr); diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index 905a99b81c1..f3e7e0990e1 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -84,7 +84,7 @@ private: public: session_info_t info; ///< durable bits - Connection *connection; + ConnectionRef connection; xlist<Session*>::item item_session_list; list<Message*> preopen_out_queue; ///< messages for client, queued before they connect diff --git a/src/messages/MMonSync.h b/src/messages/MMonSync.h index 1facaf1c004..6183578e167 100644 --- a/src/messages/MMonSync.h +++ b/src/messages/MMonSync.h @@ -131,12 +131,12 @@ public: MMonSync(uint32_t op) : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION), - op(op), flags(0), version(0) + op(op), flags(0), version(0), crc(0) { } MMonSync(uint32_t op, bufferlist bl, uint8_t flags = 0) : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION), - op(op), flags(flags), version(0), chunk_bl(bl) + op(op), flags(flags), version(0), chunk_bl(bl), crc(0) { } MMonSync(MMonSync *m) diff --git a/src/mon/AuthMonitor.cc b/src/mon/AuthMonitor.cc index e4e0dacd8ba..ec639c6d783 100644 --- a/src/mon/AuthMonitor.cc +++ b/src/mon/AuthMonitor.cc @@ -479,7 +479,7 @@ bool AuthMonitor::prep_auth(MAuth *m, bool paxos_writable) // always send the latest monmap. if (m->monmap_epoch < mon->monmap->get_epoch()) - mon->send_latest_monmap(m->get_connection()); + mon->send_latest_monmap(m->get_connection().get()); proto = s->auth_handler->start_session(entity_name, indata, response_bl, caps_info); ret = 0; diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index c0d2b5183c5..ba5f218dc11 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -139,11 +139,12 @@ int MonClient::get_monmap_privately() if (monmap.fsid.is_zero()) { messenger->mark_down(cur_con); // nope, clean that connection up - cur_con->put(); } } if (temp_msgr) { + messenger->mark_down(cur_con); + cur_con.reset(NULL); monc_lock.Unlock(); messenger->shutdown(); if (smessenger) @@ -156,10 +157,7 @@ int MonClient::get_monmap_privately() hunting = true; // reset this to true! cur_mon.clear(); - if (cur_con) { - cur_con->put(); - cur_con = NULL; - } + cur_con.reset(NULL); if (!monmap.fsid.is_zero()) return 0; @@ -320,15 +318,23 @@ int MonClient::init() void MonClient::shutdown() { + monc_lock.Lock(); + while (!version_requests.empty()) { + version_requests.begin()->second->context->complete(-ECANCELED); + delete version_requests.begin()->second; + version_requests.erase(version_requests.begin()); + } + + monc_lock.Unlock(); + if (initialized) { finisher.stop(); } monc_lock.Lock(); timer.shutdown(); - if (cur_con) { - cur_con->put(); - cur_con = NULL; - } + + messenger->mark_down(cur_con); + cur_con.reset(NULL); monc_lock.Unlock(); } @@ -494,7 +500,6 @@ void MonClient::_reopen_session(int rank, string name) if (cur_con) { messenger->mark_down(cur_con); - cur_con->put(); } cur_con = messenger->get_connection(monmap.get_inst(cur_mon)); @@ -581,7 +586,7 @@ void MonClient::tick() if (now > sub_renew_after) _renew_subs(); - messenger->send_keepalive(cur_con); + messenger->send_keepalive(cur_con.get()); if (state == MC_STATE_HAVE_SESSION) { send_log(); diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index 7d4a0548f9d..f82714ecf37 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -63,7 +63,7 @@ private: Messenger *messenger; string cur_mon; - Connection *cur_con; + ConnectionRef cur_con; SimpleRNG rng; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 9910eab32f5..986d8633cc3 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -211,8 +211,6 @@ Monitor::~Monitor() delete paxos; assert(session_map.sessions.empty()); delete mon_caps; - if (con_self) - con_self->put(); } @@ -597,6 +595,7 @@ void Monitor::shutdown() } // clean up + paxos->shutdown(); for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) (*p)->shutdown(); health_monitor->shutdown(); @@ -2911,7 +2910,6 @@ void Monitor::forward_request_leader(PaxosServiceMessage *req) rr->tid = ++routed_request_tid; rr->client_inst = req->get_source_inst(); rr->con = req->get_connection(); - rr->con->get(); encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features rr->session = static_cast<MonSession *>(session->get()); routed_requests[rr->tid] = rr; @@ -2950,7 +2948,7 @@ void Monitor::handle_forward(MForward *m) s->caps = m->client_caps; dout(10) << " caps are " << s->caps << dendl; - s->proxy_con = m->get_connection()->get(); + s->proxy_con = m->get_connection(); s->proxy_tid = m->tid; PaxosServiceMessage *req = m->msg; @@ -2971,8 +2969,7 @@ void Monitor::handle_forward(MForward *m) or the Session. And due to the special nature of this message, nobody refers to the Connection via the Session. So, clear out that half of the ref loop.*/ - s->con->put(); - s->con = NULL; + s->con.reset(NULL); dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl; @@ -3104,7 +3101,6 @@ void Monitor::resend_routed_requests() if (mon == rank) { dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl; req->set_connection(rr->con); - rr->con->get(); retry.push_back(new C_RetryMessage(this, req)); delete rr; } else { @@ -3171,7 +3167,7 @@ void Monitor::waitlist_or_zap_client(Message *m) * 3) command messages. We want to accept these under all possible * circumstances. */ - Connection *con = m->get_connection(); + ConnectionRef con = m->get_connection(); utime_t too_old = ceph_clock_now(g_ceph_context); too_old -= g_ceph_context->_conf->mon_lease; if (m->get_recv_stamp() > too_old && @@ -3189,12 +3185,12 @@ bool Monitor::_ms_dispatch(Message *m) { bool ret = true; - if (state == STATE_SHUTDOWN) { + if (is_shutdown()) { m->put(); return true; } - Connection *connection = m->get_connection(); + ConnectionRef connection = m->get_connection(); MonSession *s = NULL; MonCap caps; EntityName entity_name; @@ -3218,7 +3214,7 @@ bool Monitor::_ms_dispatch(Message *m) return true; } dout(10) << "do not have session, making new one" << dendl; - s = session_map.new_session(m->get_source_inst(), m->get_connection()); + s = session_map.new_session(m->get_source_inst(), m->get_connection().get()); m->get_connection()->set_priv(s->get()); dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl; @@ -3883,7 +3879,7 @@ bool Monitor::ms_handle_reset(Connection *con) { dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl; - if (state == STATE_SHUTDOWN) + if (is_shutdown()) return false; // ignore lossless monitor sessions @@ -3920,7 +3916,7 @@ void Monitor::check_sub(Subscription *sub) { dout(10) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl; if (sub->next <= monmap->get_epoch()) { - send_latest_monmap(sub->session->con); + send_latest_monmap(sub->session->con.get()); if (sub->onetime) session_map.remove_sub(sub); else @@ -3941,7 +3937,7 @@ void Monitor::send_latest_monmap(Connection *con) void Monitor::handle_mon_get_map(MMonGetMap *m) { dout(10) << "handle_mon_get_map" << dendl; - send_latest_monmap(m->get_connection()); + send_latest_monmap(m->get_connection().get()); m->put(); } @@ -4171,7 +4167,7 @@ bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer, boo { dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id) << dendl; - if (state == STATE_SHUTDOWN) + if (is_shutdown()) return false; // we only connect to other monitors; every else connects to us. @@ -4235,7 +4231,7 @@ bool Monitor::ms_verify_authorizer(Connection *con, int peer_type, << " " << ceph_entity_type_name(peer_type) << " protocol " << protocol << dendl; - if (state == STATE_SHUTDOWN) + if (is_shutdown()) return false; if (peer_type == CEPH_ENTITY_TYPE_MON && diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 6db49a9989a..9e94fa90aaa 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -109,7 +109,7 @@ public: string name; int rank; Messenger *messenger; - Connection *con_self; + ConnectionRef con_self; Mutex lock; SafeTimer timer; @@ -177,6 +177,7 @@ public: return sn; } + bool is_shutdown() const { return state == STATE_SHUTDOWN; } bool is_probing() const { return state == STATE_PROBING; } bool is_synchronizing() const { return state == STATE_SYNCHRONIZING; } bool is_electing() const { return state == STATE_ELECTING; } @@ -1311,14 +1312,12 @@ public: uint64_t tid; bufferlist request_bl; MonSession *session; - Connection *con; + ConnectionRef con; entity_inst_t client_inst; ~RoutedRequest() { if (session) session->put(); - if (con) - con->put(); } }; uint64_t routed_request_tid; diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index 58b1fc7bd52..1178fcc381b 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -1604,6 +1604,6 @@ void PGMonitor::check_sub(Subscription *sub) { if (sub->type == "osd_pg_creates") { send_pg_creates(sub->session->inst.name.num(), - sub->session->con); + sub->session->con.get()); } } diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc index c32e77fee31..79ea0d41281 100644 --- a/src/mon/PaxosService.cc +++ b/src/mon/PaxosService.cc @@ -35,6 +35,11 @@ bool PaxosService::dispatch(PaxosServiceMessage *m) { dout(10) << "dispatch " << *m << " from " << m->get_orig_source_inst() << dendl; + if (mon->is_shutdown()) { + m->put(); + return true; + } + // make sure this message isn't forwarded from a previous election epoch if (m->rx_election_epoch && m->rx_election_epoch < mon->get_epoch()) { diff --git a/src/mon/Session.h b/src/mon/Session.h index c8a7f3c4595..7df6001581c 100644 --- a/src/mon/Session.h +++ b/src/mon/Session.h @@ -38,7 +38,7 @@ struct Subscription { }; struct MonSession : public RefCountedObject { - Connection *con; + ConnectionRef con; entity_inst_t inst; utime_t until; utime_t time_established; @@ -54,21 +54,17 @@ struct MonSession : public RefCountedObject { AuthServiceHandler *auth_handler; - Connection *proxy_con; + ConnectionRef proxy_con; uint64_t proxy_tid; MonSession(const entity_inst_t& i, Connection *c) : - con(c->get()), inst(i), closed(false), item(this), + con(c), inst(i), closed(false), item(this), auid(0), global_id(0), notified_global_id(0), auth_handler(NULL), proxy_con(NULL), proxy_tid(0) { time_established = ceph_clock_now(g_ceph_context); } ~MonSession() { - if (con) - con->put(); - if (proxy_con) - proxy_con->put(); //generic_dout(0) << "~MonSession " << this << dendl; // we should have been removed before we get destructed; see MonSessionMap::remove_session() assert(!item.is_on_list()); diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index 67bb52e6c7b..3adc038e87f 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -54,7 +54,7 @@ void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) void DispatchQueue::local_delivery(Message *m, int priority) { Mutex::Locker l(lock); - m->set_connection(msgr->local_connection->get()); + m->set_connection(msgr->local_connection.get()); add_arrival(m); if (priority >= CEPH_MSG_PRIO_LOW) { mqueue.enqueue_strict( @@ -78,8 +78,8 @@ void DispatchQueue::local_delivery(Message *m, int priority) void DispatchQueue::entry() { lock.Lock(); - while (!stop) { - while (!mqueue.empty() && !stop) { + while (true) { + while (!mqueue.empty()) { QueueItem qitem = mqueue.dequeue(); if (!qitem.is_code()) remove_arrival(qitem.get_message()); @@ -104,29 +104,37 @@ void DispatchQueue::entry() } } else { Message *m = qitem.get_message(); - uint64_t msize = m->get_dispatch_throttle_size(); - m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message. - - ldout(cct,1) << "<== " << m->get_source_inst() - << " " << m->get_seq() - << " ==== " << *m - << " ==== " << m->get_payload().length() << "+" << m->get_middle().length() - << "+" << m->get_data().length() - << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc - << " " << m->get_footer().data_crc << ")" - << " " << m << " con " << m->get_connection() - << dendl; - msgr->ms_deliver_dispatch(m); - - msgr->dispatch_throttle_release(msize); - - ldout(cct,20) << "done calling dispatch on " << m << dendl; + if (stop) { + ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl; + m->put(); + } else { + uint64_t msize = m->get_dispatch_throttle_size(); + m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message. + + ldout(cct,1) << "<== " << m->get_source_inst() + << " " << m->get_seq() + << " ==== " << *m + << " ==== " << m->get_payload().length() << "+" << m->get_middle().length() + << "+" << m->get_data().length() + << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc + << " " << m->get_footer().data_crc << ")" + << " " << m << " con " << m->get_connection() + << dendl; + msgr->ms_deliver_dispatch(m); + + msgr->dispatch_throttle_release(msize); + + ldout(cct,20) << "done calling dispatch on " << m << dendl; + } } lock.Lock(); } - if (!stop) - cond.Wait(lock); //wait for something to be put on queue + if (stop) + break; + + // wait for something to be put on queue + cond.Wait(lock); } lock.Unlock(); } diff --git a/src/msg/Message.h b/src/msg/Message.h index aca91184141..2e3d59b886d 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -161,7 +161,7 @@ class Messenger; -struct Connection : public RefCountedObject { +struct Connection : private RefCountedObject { Mutex lock; Messenger *msgr; RefCountedObject *priv; @@ -174,6 +174,8 @@ struct Connection : public RefCountedObject { int rx_buffers_version; map<tid_t,pair<bufferlist,int> > rx_buffers; + friend class boost::intrusive_ptr<Connection>; + public: Connection(Messenger *m) : lock("Connection::lock"), @@ -184,6 +186,9 @@ public: pipe(NULL), failed(false), rx_buffers_version(0) { + // we are managed exlusively by ConnectionRef; make it so you can + // ConnectionRef foo = new Connection; + nref.set(0); } ~Connection() { //generic_dout(0) << "~Connection " << this << dendl; @@ -195,10 +200,6 @@ public: pipe->put(); } - Connection *get() { - return static_cast<Connection *>(RefCountedObject::get()); - } - void set_priv(RefCountedObject *o) { Mutex::Locker l(lock); if (priv) @@ -230,13 +231,15 @@ public: } return !failed; } - void clear_pipe(RefCountedObject *old_p) { + bool clear_pipe(RefCountedObject *old_p) { if (old_p == pipe) { Mutex::Locker l(lock); pipe->put(); pipe = NULL; failed = true; + return true; } + return false; } void reset_pipe(RefCountedObject *p) { Mutex::Locker l(lock); @@ -304,7 +307,7 @@ protected: /* time at which message was fully read */ utime_t recv_complete_stamp; - Connection *connection; + ConnectionRef connection; // release our size in bytes back to this throttler when our payload // is adjusted or when we are destroyed. @@ -352,18 +355,14 @@ public: protected: virtual ~Message() { assert(nref.read() == 0); - if (connection) - connection->put(); if (byte_throttler) byte_throttler->put(payload.length() + middle.length() + data.length()); if (msg_throttler) msg_throttler->put(); } public: - Connection *get_connection() { return connection; } - void set_connection(Connection *c) { - if (connection) - connection->put(); + const ConnectionRef& get_connection() { return connection; } + void set_connection(const ConnectionRef& c) { connection = c; } void set_byte_throttler(Throttle *t) { byte_throttler = t; } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 13d34611e19..28643e10767 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -408,6 +408,9 @@ public: * @return 0 on success, or -errno on failure. */ virtual int send_message(Message *m, Connection *con) = 0; + int send_message(Message *m, const ConnectionRef& con) { + return send_message(m, con.get()); + } /** * Lazily queue the given Message for the given entity. Unlike with * send_message(), lazy_send_message() will not establish a @@ -450,11 +453,11 @@ public: * * @param dest The entity to get a connection for. */ - virtual Connection *get_connection(const entity_inst_t& dest) = 0; + virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0; /** * Get the Connection object associated with ourselves. */ - virtual Connection *get_loopback_connection() = 0; + virtual ConnectionRef get_loopback_connection() = 0; /** * Send a "keepalive" ping to the given dest, if it has a working Connection. * If the Messenger doesn't already have a Connection, or if the underlying @@ -494,6 +497,9 @@ public: * @param con The Connection to mark down. */ virtual void mark_down(Connection *con) = 0; + void mark_down(const ConnectionRef& con) { + mark_down(con.get()); + } /** * Unlike mark_down, this function will try and deliver * all messages before ending the connection, and it will use diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 42d461ac2f8..4eb3d266937 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -44,6 +44,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) { << " pgs=" << peer_global_seq << " cs=" << connect_seq << " l=" << policy.lossy + << " c=" << connection_state << ")."; } @@ -72,7 +73,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0) { if (con) { - connection_state = con->get(); + connection_state = con; connection_state->reset_pipe(this); } else { connection_state = new Connection(msgr); @@ -93,8 +94,6 @@ Pipe::~Pipe() { assert(out_q.empty()); assert(sent.empty()); - if (connection_state) - connection_state->put(); delete session_security; delete delay_thread; } @@ -390,7 +389,7 @@ int Pipe::accept() // Check the authorizer. If not good, bail out. - if (!msgr->verify_authorizer(connection_state, peer_type, connect.authorizer_protocol, authorizer, + if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer, authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl; @@ -566,11 +565,13 @@ int Pipe::accept() replaced = true; if (!existing->policy.lossy) { + // queue a reset on the old connection + msgr->dispatch_queue.queue_reset(connection_state.get()); + // drop my Connection, and take a ref to the existing one. do not // clear existing->connection_state, since read_message and // write_message both dereference it without pipe_lock. - connection_state->put(); - connection_state = existing->connection_state->get(); + connection_state = existing->connection_state; // make existing Connection reference us existing->connection_state->reset_pipe(this); @@ -622,7 +623,7 @@ int Pipe::accept() connection_state->get_features()); // notify - msgr->dispatch_queue.queue_accept(connection_state); + msgr->dispatch_queue.queue_accept(connection_state.get()); // ok! if (msgr->dispatch_queue.stop) @@ -1041,7 +1042,7 @@ int Pipe::connect() session_security = NULL; } - msgr->dispatch_queue.queue_connect(connection_state); + msgr->dispatch_queue.queue_connect(connection_state.get()); if (!reader_running) { ldout(msgr->cct,20) << "connect starting reader" << dendl; @@ -1170,6 +1171,8 @@ void Pipe::fault(bool onread) if (state == STATE_CLOSED || state == STATE_CLOSING) { ldout(msgr->cct,10) << "fault already closed|closing" << dendl; + if (connection_state->clear_pipe(this)) + msgr->dispatch_queue.queue_reset(connection_state.get()); return; } @@ -1205,9 +1208,8 @@ void Pipe::fault(bool onread) // disconnect from Connection, and mark it failed. future messages // will be dropped. assert(connection_state); - connection_state->clear_pipe(this); - - msgr->dispatch_queue.queue_reset(connection_state); + if (connection_state->clear_pipe(this)) + msgr->dispatch_queue.queue_reset(connection_state.get()); return; } @@ -1273,7 +1275,7 @@ void Pipe::was_session_reset() delay_thread->discard(); discard_out_queue(); - msgr->dispatch_queue.queue_remote_reset(connection_state); + msgr->dispatch_queue.queue_remote_reset(connection_state.get()); if (randomize_out_seq()) { lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; @@ -1383,7 +1385,7 @@ void Pipe::reader() continue; } - m->set_connection(connection_state->get()); + m->set_connection(connection_state.get()); // note last received message. in_seq = m->get_seq(); @@ -1516,7 +1518,7 @@ void Pipe::writer() } // associate message with Connection (for benefit of encode_payload) - m->set_connection(connection_state->get()); + m->set_connection(connection_state.get()); ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index e2a155a6038..b359bc2caf7 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -150,7 +150,7 @@ class DispatchQueue; protected: friend class SimpleMessenger; - Connection *connection_state; + ConnectionRef connection_state; utime_t backoff; // backoff time diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index c9764fac324..f1e614628df 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -66,7 +66,6 @@ SimpleMessenger::~SimpleMessenger() assert(!did_bind); // either we didn't bind or we shut down the Accepter assert(rank_pipe.empty()); // we don't have any running Pipes. assert(reaper_stop && !reaper_started); // the reaper thread is stopped - local_connection->put(); } void SimpleMessenger::ready() @@ -84,8 +83,8 @@ void SimpleMessenger::ready() int SimpleMessenger::shutdown() { ldout(cct,10) << "shutdown " << get_myaddr() << dendl; - dispatch_queue.shutdown(); mark_down_all(); + dispatch_queue.shutdown(); return 0; } @@ -112,7 +111,7 @@ int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest, lock.Lock(); Pipe *pipe = _lookup_pipe(dest.addr); - submit_message(m, (pipe ? pipe->connection_state : NULL), + submit_message(m, (pipe ? pipe->connection_state.get() : NULL), dest.addr, dest.name.type(), lazy); lock.Unlock(); return 0; @@ -357,12 +356,12 @@ bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type, return ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid,session_key); } -Connection *SimpleMessenger::get_connection(const entity_inst_t& dest) +ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest) { Mutex::Locker l(lock); if (my_inst.addr == dest.addr) { // local - return static_cast<Connection *>(local_connection->get()); + return local_connection; } // remote @@ -376,14 +375,14 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest) } Mutex::Locker l(pipe->pipe_lock); if (pipe->connection_state) - return static_cast<Connection *>(pipe->connection_state->get()); + return pipe->connection_state; // we failed too quickly! retry. FIXME. } } -Connection *SimpleMessenger::get_loopback_connection() +ConnectionRef SimpleMessenger::get_loopback_connection() { - return static_cast<Connection*>(local_connection->get()); + return local_connection; } void SimpleMessenger::submit_message(Message *m, Connection *con, @@ -563,7 +562,10 @@ void SimpleMessenger::mark_down_all() p->unregister_pipe(); p->pipe_lock.Lock(); p->stop(); + ConnectionRef con = p->connection_state; p->pipe_lock.Unlock(); + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); } lock.Unlock(); } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 0d54d174965..47ee145aa5e 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -303,8 +303,8 @@ public: * @param dest The entity to get a connection for. * @return The requested Connection, as a pointer whose reference you own. */ - virtual Connection *get_connection(const entity_inst_t& dest); - virtual Connection *get_loopback_connection(); + virtual ConnectionRef get_connection(const entity_inst_t& dest); + virtual ConnectionRef get_loopback_connection(); /** * Send a "keepalive" ping to the given dest, if it has a working Connection. * If the Messenger doesn't already have a Connection, or if the underlying @@ -555,7 +555,7 @@ public: int timeout; /// con used for sending messages to ourselves - Connection *local_connection; + ConnectionRef local_connection; /** * @defgroup SimpleMessenger internals diff --git a/src/osd/ClassHandler.cc b/src/osd/ClassHandler.cc index 6675e9e4fc4..5af2ac01a0f 100644 --- a/src/osd/ClassHandler.cc +++ b/src/osd/ClassHandler.cc @@ -31,6 +31,13 @@ int ClassHandler::open_class(const string& cname, ClassData **pcls) return 0; } +void ClassHandler::shutdown() +{ + for (map<string, ClassData>::iterator p = classes.begin(); p != classes.end(); ++p) { + dlclose(p->second.handle); + } +} + ClassHandler::ClassData *ClassHandler::_get_class(const string& cname) { ClassData *cls; diff --git a/src/osd/ClassHandler.h b/src/osd/ClassHandler.h index f336d861fbc..733ed01a35d 100644 --- a/src/osd/ClassHandler.h +++ b/src/osd/ClassHandler.h @@ -82,7 +82,8 @@ public: ClassData *register_class(const char *cname); void unregister_class(ClassData *cls); - + + void shutdown(); }; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 521e7f69b0b..a18c4459719 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1573,6 +1573,7 @@ int OSD::shutdown() service.shutdown(); op_tracker.on_shutdown(); + class_handler->shutdown(); client_messenger->shutdown(); cluster_messenger->shutdown(); hbclient_messenger->shutdown(); @@ -2511,11 +2512,9 @@ void OSD::_add_heartbeat_peer(int p) hi->peer = p; HeartbeatSession *s = new HeartbeatSession(p); hi->con_back = cons.first.get(); - hi->con_back->get(); hi->con_back->set_priv(s); if (cons.second) { hi->con_front = cons.second.get(); - hi->con_front->get(); hi->con_front->set_priv(s->get()); } dout(10) << "_add_heartbeat_peer: new peer osd." << p @@ -2537,10 +2536,8 @@ void OSD::_remove_heartbeat_peer(int n) << " " << (q->second.con_front ? q->second.con_front->get_peer_addr() : entity_addr_t()) << dendl; hbclient_messenger->mark_down(q->second.con_back); - q->second.con_back->put(); if (q->second.con_front) { hbclient_messenger->mark_down(q->second.con_front); - q->second.con_front->put(); } heartbeat_peers.erase(q); } @@ -2670,10 +2667,8 @@ void OSD::reset_heartbeat_peers() while (!heartbeat_peers.empty()) { HeartbeatInfo& hi = heartbeat_peers.begin()->second; hbclient_messenger->mark_down(hi.con_back); - hi.con_back->put(); if (hi.con_front) { hbclient_messenger->mark_down(hi.con_front); - hi.con_front->put(); } heartbeat_peers.erase(heartbeat_peers.begin()); } @@ -2694,6 +2689,7 @@ void OSD::handle_osd_ping(MOSDPing *m) heartbeat_lock.Lock(); if (is_stopping()) { heartbeat_lock.Unlock(); + m->put(); return; } @@ -2953,6 +2949,7 @@ bool OSD::heartbeat_reset(Connection *con) heartbeat_lock.Lock(); if (is_stopping()) { heartbeat_lock.Unlock(); + s->put(); return true; } map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer); @@ -2963,22 +2960,18 @@ bool OSD::heartbeat_reset(Connection *con) << ", reopening" << dendl; if (con != p->second.con_back) { hbclient_messenger->mark_down(p->second.con_back); - p->second.con_back->put(); } - p->second.con_back = NULL; + p->second.con_back.reset(NULL); if (p->second.con_front && con != p->second.con_front) { hbclient_messenger->mark_down(p->second.con_front); - p->second.con_front->put(); } - p->second.con_front = NULL; + p->second.con_front.reset(NULL); pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch); if (newcon.first) { p->second.con_back = newcon.first.get(); - p->second.con_back->get(); p->second.con_back->set_priv(s->get()); if (newcon.second) { p->second.con_front = newcon.second.get(); - p->second.con_front->get(); p->second.con_front->set_priv(s->get()); } } else { @@ -3359,6 +3352,7 @@ bool OSD::ms_handle_reset(Connection *con) if (!session) return false; session->wstate.reset(); + session->con.reset(NULL); // break con <-> session ref cycle session->put(); return true; } @@ -3549,10 +3543,7 @@ ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) next_osdmap->get_info(peer).up_from > from_epoch) { return NULL; } - ConnectionRef ret( - osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer))); - ret->put(); // Ref from get_connection - return ret; + return osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer)); } pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) @@ -3568,10 +3559,7 @@ pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t f return ret; } ret.first = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer)); - ret.first->put(); // Ref from get_connection ret.second = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_front_inst(peer)); - if (ret.second) - ret.second->put(); // Ref from get_connection return ret; } @@ -3755,7 +3743,7 @@ void OSD::handle_command(MMonCommand *m) void OSD::handle_command(MCommand *m) { - Connection *con = m->get_connection(); + ConnectionRef con = m->get_connection(); Session *session = static_cast<Session *>(con->get_priv()); if (!session) { client_messenger->send_message(new MCommandReply(m, -EPERM), con); @@ -3772,7 +3760,7 @@ void OSD::handle_command(MCommand *m) return; } - Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con); + Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con.get()); command_wq.queue(c); m->put(); @@ -4215,9 +4203,8 @@ bool OSD::heartbeat_dispatch(Message *m) case CEPH_MSG_OSD_MAP: { - Connection *self = cluster_messenger->get_loopback_connection(); + ConnectionRef self = cluster_messenger->get_loopback_connection(); cluster_messenger->send_message(m, self); - self->put(); } break; @@ -4763,10 +4750,8 @@ void OSD::note_down_osd(int peer) map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer); if (p != heartbeat_peers.end()) { hbclient_messenger->mark_down(p->second.con_back); - p->second.con_back->put(); if (p->second.con_front) { hbclient_messenger->mark_down(p->second.con_front); - p->second.con_front->put(); } heartbeat_peers.erase(p); } @@ -6671,7 +6656,7 @@ void OSD::handle_op(OpRequestRef op) return; } // share our map with sender, if they're old - _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(), + _share_map_incoming(m->get_source(), m->get_connection().get(), m->get_map_epoch(), static_cast<Session *>(m->get_connection()->get_priv())); if (op->rmw_flags == 0) { @@ -6799,7 +6784,7 @@ void OSD::handle_sub_op(OpRequestRef op) return; // share our map with sender, if they're old - _share_map_incoming(m->get_source(), m->get_connection(), m->map_epoch, + _share_map_incoming(m->get_source(), m->get_connection().get(), m->map_epoch, static_cast<Session*>(m->get_connection()->get_priv())); if (service.splitting(pgid)) { @@ -6836,7 +6821,7 @@ void OSD::handle_sub_op_reply(OpRequestRef op) if (!require_same_or_newer_map(op, m->get_map_epoch())) return; // share our map with sender, if they're old - _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(), + _share_map_incoming(m->get_source(), m->get_connection().get(), m->get_map_epoch(), static_cast<Session*>(m->get_connection()->get_priv())); PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 0e35250b79a..3d8218e6aa1 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -303,9 +303,15 @@ public: void send_message_osd_cluster(Message *m, Connection *con) { cluster_messenger->send_message(m, con); } + void send_message_osd_cluster(Message *m, const ConnectionRef& con) { + cluster_messenger->send_message(m, con.get()); + } void send_message_osd_client(Message *m, Connection *con) { client_messenger->send_message(m, con); } + void send_message_osd_client(Message *m, const ConnectionRef& con) { + client_messenger->send_message(m, con.get()); + } entity_name_t get_cluster_msgr_name() { return cluster_messenger->get_myname(); } @@ -688,7 +694,7 @@ public: OSDCap caps; int64_t auid; epoch_t last_sent_epoch; - Connection *con; + ConnectionRef con; WatchConState wstate; Session() : auid(-1), last_sent_epoch(0), con(0) {} @@ -699,8 +705,8 @@ private: /// information about a heartbeat peer struct HeartbeatInfo { int peer; ///< peer - Connection *con_front; ///< peer connection (front) - Connection *con_back; ///< peer connection (back) + ConnectionRef con_front; ///< peer connection (front) + ConnectionRef con_back; ///< peer connection (back) utime_t first_tx; ///< time we sent our first ping request utime_t last_tx; ///< last time we sent a ping request utime_t last_rx_front; ///< last time we got a ping reply on the front side @@ -1242,17 +1248,10 @@ protected: vector<string> cmd; tid_t tid; bufferlist indata; - Connection *con; + ConnectionRef con; Command(vector<string>& c, tid_t t, bufferlist& bl, Connection *co) - : cmd(c), tid(t), indata(bl), con(co) { - if (con) - con->get(); - } - ~Command() { - if (con) - con->put(); - } + : cmd(c), tid(t), indata(bl), con(co) {} }; list<Command*> command_queue; struct CommandWQ : public ThreadPool::WorkQueue<Command> { @@ -1284,7 +1283,7 @@ protected: delete c; return; } - osd->do_command(c->con, c->tid, c->cmd, c->indata); + osd->do_command(c->con.get(), c->tid, c->cmd, c->indata); osd->osd_lock.Unlock(); delete c; } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index eeb2c215d29..60e0c889932 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -3498,6 +3498,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) ConnectionRef conn(ctx->op->request->get_connection()); boost::intrusive_ptr<OSD::Session> session( (OSD::Session *)conn->get_priv()); + session->put(); // get_priv() takes a ref, and so does the intrusive_ptr entity_name_t entity = ctx->reqid.name; dout(15) << "do_osd_op_effects on session " << session.get() << dendl; diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 0afc56805ea..0ec92e2641d 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -686,12 +686,11 @@ void Objecter::handle_osd_map(MOSDMap *m) void Objecter::C_Op_Map_Latest::finish(int r) { + if (r == -EAGAIN || r == -ECANCELED) + return; + lgeneric_subdout(objecter->cct, objecter, 10) << "op_map_latest r=" << r << " tid=" << tid << " latest " << latest << dendl; - if (r == -EAGAIN) { - // ignore callback; we will retry in resend_mon_ops() - return; - } Mutex::Locker l(objecter->client_lock); @@ -763,7 +762,7 @@ void Objecter::op_cancel_map_check(Op *op) void Objecter::C_Linger_Map_Latest::finish(int r) { - if (r == -EAGAIN) { + if (r == -EAGAIN || r == -ECANCELED) { // ignore callback; we will retry in resend_mon_ops() return; } @@ -833,7 +832,7 @@ void Objecter::linger_cancel_map_check(LingerOp *op) void Objecter::C_Command_Map_Latest::finish(int r) { - if (r == -EAGAIN) { + if (r == -EAGAIN || r == -ECANCELED) { // ignore callback; we will retry in resend_mon_ops() return; } @@ -914,7 +913,6 @@ void Objecter::reopen_session(OSDSession *s) ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now " << inst << dendl; if (s->con) { messenger->mark_down(s->con); - s->con->put(); logger->inc(l_osdc_osd_session_close); } s->con = messenger->get_connection(inst); @@ -927,7 +925,6 @@ void Objecter::close_session(OSDSession *s) ldout(cct, 10) << "close_session for osd." << s->osd << dendl; if (s->con) { messenger->mark_down(s->con); - s->con->put(); logger->inc(l_osdc_osd_session_close); } s->ops.clear(); @@ -1363,8 +1360,6 @@ void Objecter::finish_op(Op *op) op->session_item.remove_myself(); if (op->budgeted) put_op_budget(op); - if (op->con) - op->con->put(); ops.erase(op->tid); logger->set(l_osdc_op_active, ops.size()); @@ -1388,11 +1383,10 @@ void Objecter::send_op(Op *op) if (op->con) { ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl; op->con->revoke_rx_buffer(op->tid); - op->con->put(); } if (op->outbl && op->outbl->length()) { ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << op->session->con << dendl; - op->con = op->session->con->get(); + op->con = op->session->con; op->con->post_rx_buffer(op->tid, *op->outbl); } diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index b64c8f77ed0..850d974472e 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -738,7 +738,7 @@ public: vector<int> acting; bool used_replica; - Connection *con; // for rx buffer only + ConnectionRef con; // for rx buffer only vector<OSDOp> ops; @@ -1064,7 +1064,7 @@ public: xlist<CommandOp*> command_ops; int osd; int incarnation; - Connection *con; + ConnectionRef con; OSDSession(int o) : osd(o), incarnation(0), con(NULL) {} }; |