summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-07-17 14:50:41 -0700
committerSage Weil <sage@inktank.com>2013-07-17 14:50:41 -0700
commitea1c623406ac0761c6d63041dcc5ca849d1fa932 (patch)
treeb04ffe68c357d6529a66afbdb84752040242abfc
parent0ebf23cee84180a0ae8b9fc0d8c2463ca31e6cbc (diff)
parent57bd6fd51b094f8406c46538bcae7486d8b77a6f (diff)
downloadceph-ea1c623406ac0761c6d63041dcc5ca849d1fa932.tar.gz
Merge pull request #441 from ceph/wip-5626
msgr fixes for lossless peer sessions Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r--src/msg/Accepter.cc5
-rw-r--r--src/msg/Pipe.cc64
-rw-r--r--src/msg/Pipe.h3
-rw-r--r--src/msg/SimpleMessenger.cc18
-rw-r--r--src/msg/SimpleMessenger.h6
-rw-r--r--src/osd/OSD.cc5
6 files changed, 78 insertions, 23 deletions
diff --git a/src/msg/Accepter.cc b/src/msg/Accepter.cc
index 4d13be8fdca..d6e94d1cc51 100644
--- a/src/msg/Accepter.cc
+++ b/src/msg/Accepter.cc
@@ -165,6 +165,11 @@ int Accepter::rebind(const set<int>& avoid_ports)
new_avoid.insert(addr.get_port());
addr.set_port(0);
+ // adjust the nonce; we want our entity_addr_t to be truly unique.
+ nonce += 1000000;
+ msgr->my_inst.addr.nonce = nonce;
+ ldout(msgr->cct,10) << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl;
+
ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl;
int r = bind(addr, new_avoid);
if (r == 0)
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index d45666c21c6..e3600c8e2e6 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -210,12 +210,11 @@ void *Pipe::DelayedDelivery::entry()
int Pipe::accept()
{
ldout(msgr->cct,10) << "accept" << dendl;
+ assert(pipe_lock.is_locked());
+ assert(state == STATE_ACCEPTING);
- set_socket_options();
+ pipe_lock.Unlock();
- // my creater gave me sd via accept()
- assert(state == STATE_ACCEPTING);
-
// vars
bufferlist addrs;
entity_addr_t socket_addr;
@@ -241,6 +240,8 @@ int Pipe::accept()
// used for reading in the remote acked seq on connect
uint64_t newly_acked_seq = 0;
+ set_socket_options();
+
// announce myself.
r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
if (r < 0) {
@@ -311,7 +312,6 @@ int Pipe::accept()
goto fail_unlocked;
}
-
authorizer.clear();
if (connect.authorizer_len) {
bp = buffer::create(connect.authorizer_len);
@@ -328,25 +328,32 @@ int Pipe::accept()
<< dendl;
msgr->lock.Lock(); // FIXME
+ pipe_lock.Lock();
if (msgr->dispatch_queue.stop)
goto shutting_down;
+ if (state != STATE_ACCEPTING) {
+ goto shutting_down;
+ }
// note peer's type, flags
set_peer_type(connect.host_type);
policy = msgr->get_policy(connect.host_type);
ldout(msgr->cct,10) << "accept of host_type " << connect.host_type
- << ", policy.lossy=" << policy.lossy
- << dendl;
+ << ", policy.lossy=" << policy.lossy
+ << " policy.server=" << policy.server
+ << " policy.standby=" << policy.standby
+ << " policy.resetcheck=" << policy.resetcheck
+ << dendl;
memset(&reply, 0, sizeof(reply));
reply.protocol_version = msgr->get_proto_version(peer_type, false);
+ msgr->lock.Unlock();
// mismatch?
ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
<< ", their proto " << connect.protocol_version << dendl;
if (connect.protocol_version != reply.protocol_version) {
reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
- msgr->lock.Unlock();
goto reply;
}
@@ -372,12 +379,9 @@ int Pipe::accept()
if (feat_missing) {
ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
reply.tag = CEPH_MSGR_TAG_FEATURES;
- msgr->lock.Unlock();
goto reply;
}
- msgr->lock.Unlock();
-
// Check the authorizer. If not good, bail out.
if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer,
@@ -394,15 +398,18 @@ int Pipe::accept()
ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
+ pipe_lock.Unlock();
msgr->lock.Lock();
+ pipe_lock.Lock();
if (msgr->dispatch_queue.stop)
goto shutting_down;
-
+ if (state != STATE_ACCEPTING)
+ goto shutting_down;
// existing?
existing = msgr->_lookup_pipe(peer_addr);
if (existing) {
- existing->pipe_lock.Lock();
+ existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
if (connect.global_seq < existing->peer_global_seq) {
ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
@@ -526,6 +533,8 @@ int Pipe::accept()
assert(0);
retry_session:
+ assert(existing->pipe_lock.is_locked());
+ assert(pipe_lock.is_locked());
reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
reply.connect_seq = existing->connect_seq + 1;
existing->pipe_lock.Unlock();
@@ -533,8 +542,10 @@ int Pipe::accept()
goto reply;
reply:
+ assert(pipe_lock.is_locked());
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
reply.authorizer_len = authorizer_reply.length();
+ pipe_lock.Unlock();
r = tcp_write((char*)&reply, sizeof(reply));
if (r < 0)
goto fail_unlocked;
@@ -546,6 +557,8 @@ int Pipe::accept()
}
replace:
+ assert(existing->pipe_lock.is_locked());
+ assert(pipe_lock.is_locked());
if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) {
reply_tag = CEPH_MSGR_TAG_SEQ;
existing_seq = existing->in_seq;
@@ -596,8 +609,10 @@ int Pipe::accept()
open:
// open
+ assert(pipe_lock.is_locked());
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
+ assert(state == STATE_ACCEPTING);
state = STATE_OPEN;
ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
@@ -624,8 +639,11 @@ int Pipe::accept()
// ok!
if (msgr->dispatch_queue.stop)
goto shutting_down;
+ inr removed = msgr->accepting_pipes.erase(this);
+ assert(removed == 1);
register_pipe();
msgr->lock.Unlock();
+ pipe_lock.Unlock();
r = tcp_write((char*)&reply, sizeof(reply));
if (r < 0) {
@@ -657,7 +675,6 @@ int Pipe::accept()
start_writer();
}
ldout(msgr->cct,20) << "accept done" << dendl;
- pipe_lock.Unlock();
maybe_start_delay_thread();
@@ -690,10 +707,10 @@ int Pipe::accept()
if (queued || replaced)
start_writer();
}
- pipe_lock.Unlock();
return -1;
shutting_down:
+ assert(pipe_lock.is_locked());
msgr->lock.Unlock();
if (msgr->cct->_conf->ms_inject_internal_delays) {
@@ -703,11 +720,9 @@ int Pipe::accept()
t.sleep();
}
- pipe_lock.Lock();
state = STATE_CLOSED;
state_closed.set(1);
fault();
- pipe_lock.Unlock();
return -1;
}
@@ -1007,6 +1022,8 @@ int Pipe::connect()
ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << dendl;
goto fail_locked;
}
+ ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq
+ << " vs out_seq " << out_seq << dendl;
while (newly_acked_seq > out_seq) {
Message *m = _get_next_outgoing();
assert(m);
@@ -1101,6 +1118,7 @@ void Pipe::unregister_pipe()
msgr->rank_pipe.erase(p);
} else {
ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
+ msgr->accepting_pipes.erase(this); // somewhat overkill, but safe.
}
}
@@ -1124,6 +1142,8 @@ void Pipe::requeue_sent()
void Pipe::discard_requeued_up_to(uint64_t seq)
{
ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
+ if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
+ return;
list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
while (!rq.empty()) {
Message *m = rq.front();
@@ -1135,6 +1155,8 @@ void Pipe::discard_requeued_up_to(uint64_t seq)
rq.pop_front();
out_seq++;
}
+ if (rq.empty())
+ out_q.erase(CEPH_MSG_PRIO_HIGHEST);
}
/*
@@ -1305,11 +1327,13 @@ void Pipe::stop()
*/
void Pipe::reader()
{
- if (state == STATE_ACCEPTING)
- accept();
-
pipe_lock.Lock();
+ if (state == STATE_ACCEPTING) {
+ accept();
+ assert(pipe_lock.is_locked());
+ }
+
// loop.
while (state != STATE_CLOSED &&
state != STATE_CONNECTING) {
diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h
index b359bc2caf7..5f94305350c 100644
--- a/src/msg/Pipe.h
+++ b/src/msg/Pipe.h
@@ -247,14 +247,17 @@ class DispatchQueue;
void stop();
void _send(Message *m) {
+ assert(pipe_lock.is_locked());
out_q[m->get_priority()].push_back(m);
cond.Signal();
}
void _send_keepalive() {
+ assert(pipe_lock.is_locked());
keepalive = true;
cond.Signal();
}
Message *_get_next_outgoing() {
+ assert(pipe_lock.is_locked());
Message *m = 0;
while (!m && !out_q.empty()) {
map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index 48e37d87098..afee0952630 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -276,9 +276,10 @@ int SimpleMessenger::bind(const entity_addr_t &bind_addr)
int SimpleMessenger::rebind(const set<int>& avoid_ports)
{
ldout(cct,1) << "rebind avoid " << avoid_ports << dendl;
- mark_down_all();
assert(did_bind);
- return accepter.rebind(avoid_ports);
+ int r = accepter.rebind(avoid_ports);
+ mark_down_all();
+ return r;
}
int SimpleMessenger::start()
@@ -311,6 +312,7 @@ Pipe *SimpleMessenger::add_accept_pipe(int sd)
p->start_reader();
p->pipe_lock.Unlock();
pipes.insert(p);
+ accepting_pipes.insert(p);
lock.Unlock();
return p;
}
@@ -559,6 +561,18 @@ void SimpleMessenger::mark_down_all()
{
ldout(cct,1) << "mark_down_all" << dendl;
lock.Lock();
+ for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) {
+ Pipe *p = *q;
+ ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
+ p->pipe_lock.Lock();
+ p->stop();
+ ConnectionRef con = p->connection_state;
+ if (con && con->clear_pipe(p))
+ dispatch_queue.queue_reset(con.get());
+ p->pipe_lock.Unlock();
+ }
+ accepting_pipes.clear();
+
while (!rank_pipe.empty()) {
hash_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin();
Pipe *p = it->second;
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index 47ee145aa5e..4538b0f18bc 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -514,6 +514,12 @@ private:
* invalid and can be replaced by anyone holding the msgr lock
*/
hash_map<entity_addr_t, Pipe*> rank_pipe;
+ /**
+ * list of pipes are in teh process of accepting
+ *
+ * These are not yet in the rank_pipe map.
+ */
+ set<Pipe*> accepting_pipes;
/// a set of all the Pipes we have which are somehow active
set<Pipe*> pipes;
/// a list of Pipes we want to tear down
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 109683ff051..5b8ef307dd1 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -5568,7 +5568,10 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
int from = m->get_source().num();
if (!osdmap->have_inst(from) ||
osdmap->get_cluster_addr(from) != m->get_source_inst().addr) {
- dout(10) << "from dead osd." << from << ", marking down" << dendl;
+ dout(5) << "from dead osd." << from << ", marking down, "
+ << " msg was " << m->get_source_inst().addr
+ << " expected " << (osdmap->have_inst(from) ? osdmap->get_cluster_addr(from) : entity_addr_t())
+ << dendl;
cluster_messenger->mark_down(m->get_connection());
return false;
}