diff options
author | Sage Weil <sage@inktank.com> | 2012-11-05 09:56:15 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-11-30 13:18:21 -0800 |
commit | 1c6c92123d8789951a6fda5408d464afeec90bae (patch) | |
tree | a05da1a2ee35dad75bd45b6d416f69ce68cb91b5 | |
parent | cdb8b55548a800a98b437eec9d5254d9f1408cac (diff) | |
download | ceph-1c6c92123d8789951a6fda5408d464afeec90bae.tar.gz |
wip
-rw-r--r-- | src/osd/OSD.cc | 65 | ||||
-rw-r--r-- | src/osd/OSD.h | 22 | ||||
-rw-r--r-- | src/osd/PG.cc | 2 | ||||
-rw-r--r-- | src/osd/PG.h | 1 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 1 | ||||
-rw-r--r-- | src/osd/osd_types.h | 5 |
6 files changed, 88 insertions, 8 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index bbfa720a793..65566111961 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1795,7 +1795,7 @@ void OSD::_add_heartbeat_peer(int p) hi->con = con.get(); hi->con->get(); hi->peer = p; - hi->con->set_priv(new HeartbeatClientSession(p)); + hi->con->set_priv(new HeartbeatSession(p)); dout(10) << "_add_heartbeat_peer: new peer osd." << p << " " << hi->con->get_peer_addr() << dendl; } else { @@ -1928,6 +1928,18 @@ void OSD::handle_osd_ping(MOSDPing *m) } } } + + HeartbeatSession *s = con->get_priv(); + if (s) { + s->last_ack = ceph_clock_now(g_ceph_context); + } else { + s = new HeartbeatSession(-1); + s->get(); + s->client = make_pair(m->get_source().num(), m->up_from); + heartbeat_clients[s->client].insert(s); + con->set_priv(s); + dout(20) << "handle_osd_ping new client session for " << con->get_peer_addr() << " " << s->client << " " << s << dendl; + } } break; @@ -2026,6 +2038,43 @@ void OSD::heartbeat_check() failure_queue[p->first] = p->second.last_rx; } } + + // trim old server sessions + utime_t cutoff = ceph_clock_now(g_ceph_context); + cutoff -= g_conf->osd_heartbeat_read_interval; + list<HeartbeatSession*>::iterator p = heartbeat_clients_closed.begin(); + while (p != heartbeat_clients_closed.end()) { + HeartbeatSession *s = *p; + if (s->last_ack >= cutoff) + break; + dout(20) << "heartbeat_check trimming closed session " << s << dendl; + heartbeat_clients_closed.erase(p++); + + map<pair<int,epoch_t>, set<HeartbeatSession*> >::iterator q = heartbeat_clients.find(s->client); + assert(q != heartbeat_clients.end()); + q->second.erase(s); + if (q->second.empty()) + heartbeat_clients.erase(q); + + s->put(); + } +} + +utime_t OSD::get_last_hb_ack(int peer, epoch_t up_from) +{ + Mutex::Locker l(heartbeat_lock); + map<pair<int,epoch_t>,set<HeartbeatSession*> >::iterator p = heartbeat_clients.find(make_pair(peer, up_from)); + if (p == heartbeat_clients.end()) + return utime_t(); + set<HeartSession*>::iterator q = p->second.begin(); + utime_t min = (*q)->last_ack; + while (++q != heartbeat_clients.end()) { + if ((*q)->last_ack > min) { + min = (*q)->last_ack; + } + } + dout(20) << "get_last_hb_ack " << peer << "," << up_from << " " << min << dendl; + return min; } void OSD::heartbeat() @@ -2089,8 +2138,11 @@ void OSD::heartbeat() bool OSD::heartbeat_reset(Connection *con) { - HeartbeatClientSession *s = (HeartbeatClientSession*)con->get_priv(); - if (s) { + HeartbeatSession *s = (HeartbeatSession*)con->get_priv(); + if (!s) + return true; + if (s->is_client()) { + // client heartbeat_lock.Lock(); map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer); if (p != heartbeat_peers.end() && @@ -2111,6 +2163,13 @@ bool OSD::heartbeat_reset(Connection *con) hbclient_messenger->mark_down(con); heartbeat_lock.Unlock(); s->put(); + } else { + // server + heartbeat_lock.Lock(); + dout(10) << "heartbeat_reset noting closed hb server con " << con << " session " << s << dendl; + s->closed = true; + heartbeat_clients_closed.push_back(s); + heartbeat_lock.Unlock(); } return true; } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index cf55328d584..f02c10e4b39 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -510,10 +510,21 @@ private: utime_t last_rx; ///< last time we got a ping reply epoch_t epoch; ///< most recent epoch we wanted this peer }; - /// state attached to outgoing heartbeat connections - struct HeartbeatClientSession : public RefCountedObject { - int peer; - HeartbeatClientSession(int p) : peer(p) {} + /// state attached to incoming or outgoing heartbeat connections + struct HeartbeatSession : public RefCountedObject { + int peer; ///< if >= 0, we are a client connecting to this peer. + utime_t last_ack; + bool closed; + pair<int,epoch_t> client; ///< client (osd, up_from) + + HeartbeatSession(int p) : peer(p), closed(false) {} + + bool is_client() { + return peer >= 0; + } + bool is_server() { + return peer < 0; + } }; Mutex heartbeat_lock; map<int, int> debug_heartbeat_drops_remaining; @@ -524,6 +535,8 @@ private: map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo utime_t last_mon_heartbeat; Messenger *hbclient_messenger, *hbserver_messenger, *hbserver_messenger_previous; + map<pair<int,epoch_t>,set<HeartbeatSession*> > heartbeat_clients; ///< (epoch, up_from) -> sessions... + list<HeartbeatSession*> heartbeat_clients_closed; ///< closed sessions void _add_heartbeat_peer(int p); bool heartbeat_reset(Connection *con); @@ -533,6 +546,7 @@ private: void heartbeat_check(); void heartbeat_entry(); void need_heartbeat_peer_update(); + utime_t get_last_hb_ack(int peer, epoch_t up_from); struct T_Heartbeat : public Thread { OSD *osd; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 617ba9e250f..4970de1b04d 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -71,6 +71,8 @@ PG::PG(OSDService *o, OSDMapRef curmap, role(0), state(0), send_notify(false), + primary_up_from(0), + prior_set_built(false), need_up_thru(false), need_flush(false), last_peering_reset(0), diff --git a/src/osd/PG.h b/src/osd/PG.h index b9693fb072a..d2fcde657f1 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -459,6 +459,7 @@ public: map<int,eversion_t> peer_last_complete_ondisk; eversion_t min_last_complete_ondisk; // up: min over last_complete_ondisk, peer_last_complete_ondisk eversion_t pg_trim_to; + epoch_t primary_up_from; // [primary only] content recovery state protected: diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index b4121bbdbf3..3bd07cde73c 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -1404,6 +1404,7 @@ void pg_notify_t::dump(Formatter *f) const info.dump(f); f->close_section(); } + f->dump_stream("last_hb") << last_hb; } void pg_notify_t::generate_test_instances(list<pg_notify_t*>& o) diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index e7592248020..c4261c01c9e 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -1098,6 +1098,8 @@ struct pg_notify_t { epoch_t query_epoch; epoch_t epoch_sent; pg_info_t info; + utime_t last_hb; ///< last time we acked a primary heartbeat + pg_notify_t() : query_epoch(0), epoch_sent(0) {} pg_notify_t(epoch_t query_epoch, epoch_t epoch_sent, @@ -1105,6 +1107,7 @@ struct pg_notify_t { : query_epoch(query_epoch), epoch_sent(epoch_sent), info(info) {} + void encode(bufferlist &bl) const; void decode(bufferlist::iterator &p); void dump(Formatter *f) const; |