diff options
author | Sage Weil <sage@inktank.com> | 2013-07-17 14:50:41 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-07-17 14:50:41 -0700 |
commit | ea1c623406ac0761c6d63041dcc5ca849d1fa932 (patch) | |
tree | b04ffe68c357d6529a66afbdb84752040242abfc | |
parent | 0ebf23cee84180a0ae8b9fc0d8c2463ca31e6cbc (diff) | |
parent | 57bd6fd51b094f8406c46538bcae7486d8b77a6f (diff) | |
download | ceph-ea1c623406ac0761c6d63041dcc5ca849d1fa932.tar.gz |
Merge pull request #441 from ceph/wip-5626
msgr fixes for lossless peer sessions
Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r-- | src/msg/Accepter.cc | 5 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 64 | ||||
-rw-r--r-- | src/msg/Pipe.h | 3 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.cc | 18 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 6 | ||||
-rw-r--r-- | src/osd/OSD.cc | 5 |
6 files changed, 78 insertions, 23 deletions
diff --git a/src/msg/Accepter.cc b/src/msg/Accepter.cc index 4d13be8fdca..d6e94d1cc51 100644 --- a/src/msg/Accepter.cc +++ b/src/msg/Accepter.cc @@ -165,6 +165,11 @@ int Accepter::rebind(const set<int>& avoid_ports) new_avoid.insert(addr.get_port()); addr.set_port(0); + // adjust the nonce; we want our entity_addr_t to be truly unique. + nonce += 1000000; + msgr->my_inst.addr.nonce = nonce; + ldout(msgr->cct,10) << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl; + ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl; int r = bind(addr, new_avoid); if (r == 0) diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index d45666c21c6..e3600c8e2e6 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -210,12 +210,11 @@ void *Pipe::DelayedDelivery::entry() int Pipe::accept() { ldout(msgr->cct,10) << "accept" << dendl; + assert(pipe_lock.is_locked()); + assert(state == STATE_ACCEPTING); - set_socket_options(); + pipe_lock.Unlock(); - // my creater gave me sd via accept() - assert(state == STATE_ACCEPTING); - // vars bufferlist addrs; entity_addr_t socket_addr; @@ -241,6 +240,8 @@ int Pipe::accept() // used for reading in the remote acked seq on connect uint64_t newly_acked_seq = 0; + set_socket_options(); + // announce myself. r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER)); if (r < 0) { @@ -311,7 +312,6 @@ int Pipe::accept() goto fail_unlocked; } - authorizer.clear(); if (connect.authorizer_len) { bp = buffer::create(connect.authorizer_len); @@ -328,25 +328,32 @@ int Pipe::accept() << dendl; msgr->lock.Lock(); // FIXME + pipe_lock.Lock(); if (msgr->dispatch_queue.stop) goto shutting_down; + if (state != STATE_ACCEPTING) { + goto shutting_down; + } // note peer's type, flags set_peer_type(connect.host_type); policy = msgr->get_policy(connect.host_type); ldout(msgr->cct,10) << "accept of host_type " << connect.host_type - << ", policy.lossy=" << policy.lossy - << dendl; + << ", policy.lossy=" << policy.lossy + << " policy.server=" << policy.server + << " policy.standby=" << policy.standby + << " policy.resetcheck=" << policy.resetcheck + << dendl; memset(&reply, 0, sizeof(reply)); reply.protocol_version = msgr->get_proto_version(peer_type, false); + msgr->lock.Unlock(); // mismatch? ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version << ", their proto " << connect.protocol_version << dendl; if (connect.protocol_version != reply.protocol_version) { reply.tag = CEPH_MSGR_TAG_BADPROTOVER; - msgr->lock.Unlock(); goto reply; } @@ -372,12 +379,9 @@ int Pipe::accept() if (feat_missing) { ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl; reply.tag = CEPH_MSGR_TAG_FEATURES; - msgr->lock.Unlock(); goto reply; } - msgr->lock.Unlock(); - // Check the authorizer. If not good, bail out. if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer, @@ -394,15 +398,18 @@ int Pipe::accept() ldout(msgr->cct,10) << "accept: setting up session_security." << dendl; + pipe_lock.Unlock(); msgr->lock.Lock(); + pipe_lock.Lock(); if (msgr->dispatch_queue.stop) goto shutting_down; - + if (state != STATE_ACCEPTING) + goto shutting_down; // existing? existing = msgr->_lookup_pipe(peer_addr); if (existing) { - existing->pipe_lock.Lock(); + existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here) if (connect.global_seq < existing->peer_global_seq) { ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq @@ -526,6 +533,8 @@ int Pipe::accept() assert(0); retry_session: + assert(existing->pipe_lock.is_locked()); + assert(pipe_lock.is_locked()); reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; reply.connect_seq = existing->connect_seq + 1; existing->pipe_lock.Unlock(); @@ -533,8 +542,10 @@ int Pipe::accept() goto reply; reply: + assert(pipe_lock.is_locked()); reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; reply.authorizer_len = authorizer_reply.length(); + pipe_lock.Unlock(); r = tcp_write((char*)&reply, sizeof(reply)); if (r < 0) goto fail_unlocked; @@ -546,6 +557,8 @@ int Pipe::accept() } replace: + assert(existing->pipe_lock.is_locked()); + assert(pipe_lock.is_locked()); if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) { reply_tag = CEPH_MSGR_TAG_SEQ; existing_seq = existing->in_seq; @@ -596,8 +609,10 @@ int Pipe::accept() open: // open + assert(pipe_lock.is_locked()); connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; + assert(state == STATE_ACCEPTING); state = STATE_OPEN; ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; @@ -624,8 +639,11 @@ int Pipe::accept() // ok! if (msgr->dispatch_queue.stop) goto shutting_down; + inr removed = msgr->accepting_pipes.erase(this); + assert(removed == 1); register_pipe(); msgr->lock.Unlock(); + pipe_lock.Unlock(); r = tcp_write((char*)&reply, sizeof(reply)); if (r < 0) { @@ -657,7 +675,6 @@ int Pipe::accept() start_writer(); } ldout(msgr->cct,20) << "accept done" << dendl; - pipe_lock.Unlock(); maybe_start_delay_thread(); @@ -690,10 +707,10 @@ int Pipe::accept() if (queued || replaced) start_writer(); } - pipe_lock.Unlock(); return -1; shutting_down: + assert(pipe_lock.is_locked()); msgr->lock.Unlock(); if (msgr->cct->_conf->ms_inject_internal_delays) { @@ -703,11 +720,9 @@ int Pipe::accept() t.sleep(); } - pipe_lock.Lock(); state = STATE_CLOSED; state_closed.set(1); fault(); - pipe_lock.Unlock(); return -1; } @@ -1007,6 +1022,8 @@ int Pipe::connect() ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << dendl; goto fail_locked; } + ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq + << " vs out_seq " << out_seq << dendl; while (newly_acked_seq > out_seq) { Message *m = _get_next_outgoing(); assert(m); @@ -1101,6 +1118,7 @@ void Pipe::unregister_pipe() msgr->rank_pipe.erase(p); } else { ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl; + msgr->accepting_pipes.erase(this); // somewhat overkill, but safe. } } @@ -1124,6 +1142,8 @@ void Pipe::requeue_sent() void Pipe::discard_requeued_up_to(uint64_t seq) { ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl; + if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) + return; list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; while (!rq.empty()) { Message *m = rq.front(); @@ -1135,6 +1155,8 @@ void Pipe::discard_requeued_up_to(uint64_t seq) rq.pop_front(); out_seq++; } + if (rq.empty()) + out_q.erase(CEPH_MSG_PRIO_HIGHEST); } /* @@ -1305,11 +1327,13 @@ void Pipe::stop() */ void Pipe::reader() { - if (state == STATE_ACCEPTING) - accept(); - pipe_lock.Lock(); + if (state == STATE_ACCEPTING) { + accept(); + assert(pipe_lock.is_locked()); + } + // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index b359bc2caf7..5f94305350c 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -247,14 +247,17 @@ class DispatchQueue; void stop(); void _send(Message *m) { + assert(pipe_lock.is_locked()); out_q[m->get_priority()].push_back(m); cond.Signal(); } void _send_keepalive() { + assert(pipe_lock.is_locked()); keepalive = true; cond.Signal(); } Message *_get_next_outgoing() { + assert(pipe_lock.is_locked()); Message *m = 0; while (!m && !out_q.empty()) { map<int, list<Message*> >::reverse_iterator p = out_q.rbegin(); diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 48e37d87098..afee0952630 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -276,9 +276,10 @@ int SimpleMessenger::bind(const entity_addr_t &bind_addr) int SimpleMessenger::rebind(const set<int>& avoid_ports) { ldout(cct,1) << "rebind avoid " << avoid_ports << dendl; - mark_down_all(); assert(did_bind); - return accepter.rebind(avoid_ports); + int r = accepter.rebind(avoid_ports); + mark_down_all(); + return r; } int SimpleMessenger::start() @@ -311,6 +312,7 @@ Pipe *SimpleMessenger::add_accept_pipe(int sd) p->start_reader(); p->pipe_lock.Unlock(); pipes.insert(p); + accepting_pipes.insert(p); lock.Unlock(); return p; } @@ -559,6 +561,18 @@ void SimpleMessenger::mark_down_all() { ldout(cct,1) << "mark_down_all" << dendl; lock.Lock(); + for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) { + Pipe *p = *q; + ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl; + p->pipe_lock.Lock(); + p->stop(); + ConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); + p->pipe_lock.Unlock(); + } + accepting_pipes.clear(); + while (!rank_pipe.empty()) { hash_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin(); Pipe *p = it->second; diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 47ee145aa5e..4538b0f18bc 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -514,6 +514,12 @@ private: * invalid and can be replaced by anyone holding the msgr lock */ hash_map<entity_addr_t, Pipe*> rank_pipe; + /** + * list of pipes are in teh process of accepting + * + * These are not yet in the rank_pipe map. + */ + set<Pipe*> accepting_pipes; /// a set of all the Pipes we have which are somehow active set<Pipe*> pipes; /// a list of Pipes we want to tear down diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 109683ff051..5b8ef307dd1 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5568,7 +5568,10 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch) int from = m->get_source().num(); if (!osdmap->have_inst(from) || osdmap->get_cluster_addr(from) != m->get_source_inst().addr) { - dout(10) << "from dead osd." << from << ", marking down" << dendl; + dout(5) << "from dead osd." << from << ", marking down, " + << " msg was " << m->get_source_inst().addr + << " expected " << (osdmap->have_inst(from) ? osdmap->get_cluster_addr(from) : entity_addr_t()) + << dendl; cluster_messenger->mark_down(m->get_connection()); return false; } |