summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-11-05 09:56:15 -0800
committerSage Weil <sage@inktank.com>2012-11-30 13:18:21 -0800
commit1c6c92123d8789951a6fda5408d464afeec90bae (patch)
treea05da1a2ee35dad75bd45b6d416f69ce68cb91b5
parentcdb8b55548a800a98b437eec9d5254d9f1408cac (diff)
downloadceph-1c6c92123d8789951a6fda5408d464afeec90bae.tar.gz
wip
-rw-r--r--src/osd/OSD.cc65
-rw-r--r--src/osd/OSD.h22
-rw-r--r--src/osd/PG.cc2
-rw-r--r--src/osd/PG.h1
-rw-r--r--src/osd/osd_types.cc1
-rw-r--r--src/osd/osd_types.h5
6 files changed, 88 insertions, 8 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index bbfa720a793..65566111961 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -1795,7 +1795,7 @@ void OSD::_add_heartbeat_peer(int p)
hi->con = con.get();
hi->con->get();
hi->peer = p;
- hi->con->set_priv(new HeartbeatClientSession(p));
+ hi->con->set_priv(new HeartbeatSession(p));
dout(10) << "_add_heartbeat_peer: new peer osd." << p
<< " " << hi->con->get_peer_addr() << dendl;
} else {
@@ -1928,6 +1928,18 @@ void OSD::handle_osd_ping(MOSDPing *m)
}
}
}
+
+ HeartbeatSession *s = con->get_priv();
+ if (s) {
+ s->last_ack = ceph_clock_now(g_ceph_context);
+ } else {
+ s = new HeartbeatSession(-1);
+ s->get();
+ s->client = make_pair(m->get_source().num(), m->up_from);
+ heartbeat_clients[s->client].insert(s);
+ con->set_priv(s);
+ dout(20) << "handle_osd_ping new client session for " << con->get_peer_addr() << " " << s->client << " " << s << dendl;
+ }
}
break;
@@ -2026,6 +2038,43 @@ void OSD::heartbeat_check()
failure_queue[p->first] = p->second.last_rx;
}
}
+
+ // trim old server sessions
+ utime_t cutoff = ceph_clock_now(g_ceph_context);
+ cutoff -= g_conf->osd_heartbeat_read_interval;
+ list<HeartbeatSession*>::iterator p = heartbeat_clients_closed.begin();
+ while (p != heartbeat_clients_closed.end()) {
+ HeartbeatSession *s = *p;
+ if (s->last_ack >= cutoff)
+ break;
+ dout(20) << "heartbeat_check trimming closed session " << s << dendl;
+ heartbeat_clients_closed.erase(p++);
+
+ map<pair<int,epoch_t>, set<HeartbeatSession*> >::iterator q = heartbeat_clients.find(s->client);
+ assert(q != heartbeat_clients.end());
+ q->second.erase(s);
+ if (q->second.empty())
+ heartbeat_clients.erase(q);
+
+ s->put();
+ }
+}
+
+utime_t OSD::get_last_hb_ack(int peer, epoch_t up_from)
+{
+ Mutex::Locker l(heartbeat_lock);
+ map<pair<int,epoch_t>,set<HeartbeatSession*> >::iterator p = heartbeat_clients.find(make_pair(peer, up_from));
+ if (p == heartbeat_clients.end())
+ return utime_t();
+ set<HeartSession*>::iterator q = p->second.begin();
+ utime_t min = (*q)->last_ack;
+ while (++q != heartbeat_clients.end()) {
+ if ((*q)->last_ack > min) {
+ min = (*q)->last_ack;
+ }
+ }
+ dout(20) << "get_last_hb_ack " << peer << "," << up_from << " " << min << dendl;
+ return min;
}
void OSD::heartbeat()
@@ -2089,8 +2138,11 @@ void OSD::heartbeat()
bool OSD::heartbeat_reset(Connection *con)
{
- HeartbeatClientSession *s = (HeartbeatClientSession*)con->get_priv();
- if (s) {
+ HeartbeatSession *s = (HeartbeatSession*)con->get_priv();
+ if (!s)
+ return true;
+ if (s->is_client()) {
+ // client
heartbeat_lock.Lock();
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
if (p != heartbeat_peers.end() &&
@@ -2111,6 +2163,13 @@ bool OSD::heartbeat_reset(Connection *con)
hbclient_messenger->mark_down(con);
heartbeat_lock.Unlock();
s->put();
+ } else {
+ // server
+ heartbeat_lock.Lock();
+ dout(10) << "heartbeat_reset noting closed hb server con " << con << " session " << s << dendl;
+ s->closed = true;
+ heartbeat_clients_closed.push_back(s);
+ heartbeat_lock.Unlock();
}
return true;
}
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index cf55328d584..f02c10e4b39 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -510,10 +510,21 @@ private:
utime_t last_rx; ///< last time we got a ping reply
epoch_t epoch; ///< most recent epoch we wanted this peer
};
- /// state attached to outgoing heartbeat connections
- struct HeartbeatClientSession : public RefCountedObject {
- int peer;
- HeartbeatClientSession(int p) : peer(p) {}
+ /// state attached to incoming or outgoing heartbeat connections
+ struct HeartbeatSession : public RefCountedObject {
+ int peer; ///< if >= 0, we are a client connecting to this peer.
+ utime_t last_ack;
+ bool closed;
+ pair<int,epoch_t> client; ///< client (osd, up_from)
+
+ HeartbeatSession(int p) : peer(p), closed(false) {}
+
+ bool is_client() {
+ return peer >= 0;
+ }
+ bool is_server() {
+ return peer < 0;
+ }
};
Mutex heartbeat_lock;
map<int, int> debug_heartbeat_drops_remaining;
@@ -524,6 +535,8 @@ private:
map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
utime_t last_mon_heartbeat;
Messenger *hbclient_messenger, *hbserver_messenger, *hbserver_messenger_previous;
+ map<pair<int,epoch_t>,set<HeartbeatSession*> > heartbeat_clients; ///< (epoch, up_from) -> sessions...
+ list<HeartbeatSession*> heartbeat_clients_closed; ///< closed sessions
void _add_heartbeat_peer(int p);
bool heartbeat_reset(Connection *con);
@@ -533,6 +546,7 @@ private:
void heartbeat_check();
void heartbeat_entry();
void need_heartbeat_peer_update();
+ utime_t get_last_hb_ack(int peer, epoch_t up_from);
struct T_Heartbeat : public Thread {
OSD *osd;
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 617ba9e250f..4970de1b04d 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -71,6 +71,8 @@ PG::PG(OSDService *o, OSDMapRef curmap,
role(0),
state(0),
send_notify(false),
+ primary_up_from(0),
+ prior_set_built(false),
need_up_thru(false),
need_flush(false),
last_peering_reset(0),
diff --git a/src/osd/PG.h b/src/osd/PG.h
index b9693fb072a..d2fcde657f1 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -459,6 +459,7 @@ public:
map<int,eversion_t> peer_last_complete_ondisk;
eversion_t min_last_complete_ondisk; // up: min over last_complete_ondisk, peer_last_complete_ondisk
eversion_t pg_trim_to;
+ epoch_t primary_up_from;
// [primary only] content recovery state
protected:
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index b4121bbdbf3..3bd07cde73c 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -1404,6 +1404,7 @@ void pg_notify_t::dump(Formatter *f) const
info.dump(f);
f->close_section();
}
+ f->dump_stream("last_hb") << last_hb;
}
void pg_notify_t::generate_test_instances(list<pg_notify_t*>& o)
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index e7592248020..c4261c01c9e 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -1,4 +1,4 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
@@ -1098,6 +1098,8 @@ struct pg_notify_t {
epoch_t query_epoch;
epoch_t epoch_sent;
pg_info_t info;
+ utime_t last_hb; ///< last time we acked a primary heartbeat
+
pg_notify_t() : query_epoch(0), epoch_sent(0) {}
pg_notify_t(epoch_t query_epoch,
epoch_t epoch_sent,
@@ -1105,6 +1107,7 @@ struct pg_notify_t {
: query_epoch(query_epoch),
epoch_sent(epoch_sent),
info(info) {}
+
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &p);
void dump(Formatter *f) const;