summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-11-30 13:47:27 -0800
committerSage Weil <sage@inktank.com>2012-11-30 13:47:27 -0800
commit1652463dc0d39f327e8a8d4eaf770611d69f11cd (patch)
tree4529ab64de2f5cc8ff52d89226f1c2e547ee398a
parent1c6c92123d8789951a6fda5408d464afeec90bae (diff)
downloadceph-1652463dc0d39f327e8a8d4eaf770611d69f11cd.tar.gz
wip livenessinfo, miscwip-osd-readhole
-rw-r--r--src/osd/OSD.cc70
-rw-r--r--src/osd/OSD.h14
-rw-r--r--src/osd/osd_types.cc4
3 files changed, 46 insertions, 42 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 65566111961..2e076e056f7 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -1795,7 +1795,9 @@ void OSD::_add_heartbeat_peer(int p)
hi->con = con.get();
hi->con->get();
hi->peer = p;
- hi->con->set_priv(new HeartbeatSession(p));
+ HeartbeatSession *s = new HeartbeatSession(p);
+ s->info = _get_peer_liveness(p, osdmap->get_info(p).up_from);
+ hi->con->set_priv(s);
dout(10) << "_add_heartbeat_peer: new peer osd." << p
<< " " << hi->con->get_peer_addr() << dendl;
} else {
@@ -1929,17 +1931,17 @@ void OSD::handle_osd_ping(MOSDPing *m)
}
}
- HeartbeatSession *s = con->get_priv();
- if (s) {
- s->last_ack = ceph_clock_now(g_ceph_context);
- } else {
+ Connection *con = m->get_connection();
+ HeartbeatSession *s = (HeartbeatSession*)con->get_priv();
+ if (!s) {
s = new HeartbeatSession(-1);
+ s->info = _get_peer_liveness(m->get_source().num(), m->up_from);
s->get();
s->client = make_pair(m->get_source().num(), m->up_from);
- heartbeat_clients[s->client].insert(s);
con->set_priv(s);
dout(20) << "handle_osd_ping new client session for " << con->get_peer_addr() << " " << s->client << " " << s << dendl;
}
+ s->info->last_tx_ack = ceph_clock_now(g_ceph_context);
}
break;
@@ -2040,41 +2042,35 @@ void OSD::heartbeat_check()
}
// trim old server sessions
- utime_t cutoff = ceph_clock_now(g_ceph_context);
- cutoff -= g_conf->osd_heartbeat_read_interval;
- list<HeartbeatSession*>::iterator p = heartbeat_clients_closed.begin();
- while (p != heartbeat_clients_closed.end()) {
- HeartbeatSession *s = *p;
- if (s->last_ack >= cutoff)
- break;
- dout(20) << "heartbeat_check trimming closed session " << s << dendl;
- heartbeat_clients_closed.erase(p++);
-
- map<pair<int,epoch_t>, set<HeartbeatSession*> >::iterator q = heartbeat_clients.find(s->client);
- assert(q != heartbeat_clients.end());
- q->second.erase(s);
- if (q->second.empty())
- heartbeat_clients.erase(q);
-
- s->put();
+ {
+ utime_t cutoff = ceph_clock_now(g_ceph_context);
+ cutoff -= g_conf->osd_heartbeat_read_interval;
+ list<HeartbeatSession*>::iterator p = heartbeat_clients_closed.begin();
+ while (p != heartbeat_clients_closed.end()) {
+ HeartbeatSession *s = *p;
+ if (s->info->last_rx_ack >= cutoff ||
+ s->info->last_tx_ack >= cutoff)
+ break;
+ dout(20) << "heartbeat_check trimming closed session " << s << dendl;
+ heartbeat_clients_closed.erase(p++);
+ s->put();
+ }
}
}
-utime_t OSD::get_last_hb_ack(int peer, epoch_t up_from)
+OSD::LivenessInfoRef OSD::get_peer_liveness(int peer, epoch_t up_from)
{
Mutex::Locker l(heartbeat_lock);
- map<pair<int,epoch_t>,set<HeartbeatSession*> >::iterator p = heartbeat_clients.find(make_pair(peer, up_from));
- if (p == heartbeat_clients.end())
- return utime_t();
- set<HeartSession*>::iterator q = p->second.begin();
- utime_t min = (*q)->last_ack;
- while (++q != heartbeat_clients.end()) {
- if ((*q)->last_ack > min) {
- min = (*q)->last_ack;
- }
+ return _get_peer_liveness(peer, up_from);
+}
+
+OSD::LivenessInfoRef OSD::_get_peer_liveness(int peer, epoch_t up_from)
+{
+ map<pair<int,epoch_t>,LivenessInfoRef>::iterator p = heartbeat_peer_info.find(make_pair(peer, up_from));
+ if (p == heartbeat_peer_info.end()) {
+ p = heartbeat_peer_info.insert(make_pair(make_pair(peer, up_from), new LivenessInfo));
}
- dout(20) << "get_last_hb_ack " << peer << "," << up_from << " " << min << dendl;
- return min;
+ return p->second;
}
void OSD::heartbeat()
@@ -3838,13 +3834,13 @@ void OSD::handle_osd_map(MOSDMap *m)
hbclient_messenger->mark_down_all();
if (hbserver_messenger_previous) {
- hbserver_messenger_prevoius->mark_down_all();
+ hbserver_messenger_previous->mark_down_all();
hbserver_messenger_previous->shutdown();
// FIXME: don't leak!
}
hbserver_messenger_previous = hbserver_messenger;
hbserver_messenger = create_hbserver_messenger(whoami, nonce);
- entity_addr_t hb_addr = hbserver_messenger_previous->get_addr();
+ entity_addr_t hb_addr = hbserver_messenger_previous->get_myaddr();
hb_addr.set_port(0);
r = hbserver_messenger->bind(hb_addr);
if (r != 0)
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index f02c10e4b39..478ae1998cb 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -501,6 +501,12 @@ public:
private:
// -- heartbeat --
+ /// liveness information about a peer, both acks sent and received.
+ struct LivenessInfo {
+ utime_t last_tx_ack; ///< last ping ack we sent
+ utime_t last_rx_ack; ///< last ping ack we received
+ };
+ typedef boost::shared_ptr<LivenessInfo> LivenessInfoRef;
/// information about a heartbeat peer
struct HeartbeatInfo {
int peer; ///< peer
@@ -513,9 +519,9 @@ private:
/// state attached to incoming or outgoing heartbeat connections
struct HeartbeatSession : public RefCountedObject {
int peer; ///< if >= 0, we are a client connecting to this peer.
- utime_t last_ack;
bool closed;
pair<int,epoch_t> client; ///< client (osd, up_from)
+ LivenessInfoRef info;
HeartbeatSession(int p) : peer(p), closed(false) {}
@@ -535,7 +541,7 @@ private:
map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
utime_t last_mon_heartbeat;
Messenger *hbclient_messenger, *hbserver_messenger, *hbserver_messenger_previous;
- map<pair<int,epoch_t>,set<HeartbeatSession*> > heartbeat_clients; ///< (epoch, up_from) -> sessions...
+ map<pair<int,epoch_t>,LivenessInfoRef> heartbeat_peer_info; ///< (epoch, up_from) -> sessions...
list<HeartbeatSession*> heartbeat_clients_closed; ///< closed sessions
void _add_heartbeat_peer(int p);
@@ -547,6 +553,8 @@ private:
void heartbeat_entry();
void need_heartbeat_peer_update();
utime_t get_last_hb_ack(int peer, epoch_t up_from);
+ LivenessInfoRef _get_peer_liveness(int peer, epoch_t up_from);
+ LivenessInfoRef get_peer_liveness(int peer, epoch_t up_from);
struct T_Heartbeat : public Thread {
OSD *osd;
@@ -1391,7 +1399,7 @@ public:
void suicide(int exitcode);
int shutdown();
- static Messenger *create_hbserver_messenger();
+ static Messenger *create_hbserver_messenger(int whoami, uint64_t nonce);
void handle_signal(int signum);
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index 3bd07cde73c..31a20cd2d25 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -1460,7 +1460,7 @@ void pg_interval_t::dump(Formatter *f) const
f->open_array_section("acting");
for (vector<int>::const_iterator p = acting.begin(); p != acting.end(); ++p)
f->dump_int("osd", *p);
- f->dump_stream("end_stamp") << duration;
+ f->dump_stream("end_stamp") << end_stamp;
f->dump_unsigned("primary_up_from", primary_up_from);
f->close_section();
}
@@ -1504,7 +1504,7 @@ bool pg_interval_t::check_new_interval(
i.up = old_up;
i.end_stamp = osdmap->get_modified();
if (old_acting.size())
- i.primary_up_from = lastmap->get_osd_info(old_acting[0])->up_from;
+ i.primary_up_from = lastmap->get_info(old_acting[0]).up_from;
if (i.acting.size() >=
osdmap->get_pools().find(pool_id)->second.min_size) {