diff options
author | Gregory Farnum <greg@inktank.com> | 2013-06-23 22:42:07 -0700 |
---|---|---|
committer | Gregory Farnum <greg@inktank.com> | 2013-06-23 22:42:07 -0700 |
commit | 134d08a9654f66634b893d493e4a92f38acc63cf (patch) | |
tree | f1178a3d903bff5aa2e8b84df25064e52acdebf4 | |
parent | b89d7420e3501247d6ed282d2253c95c758526b1 (diff) | |
parent | 57dc73627eaea0d8f928fd46251accb02e6d8b02 (diff) | |
download | ceph-134d08a9654f66634b893d493e4a92f38acc63cf.tar.gz |
Merge pull request #375 from ceph/wip-msgr
misc msgr fixes
Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r-- | src/msg/Pipe.cc | 110 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.cc | 21 |
2 files changed, 71 insertions, 60 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 4eb3d266937..b581e367332 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -216,69 +216,78 @@ int Pipe::accept() // my creater gave me sd via accept() assert(state == STATE_ACCEPTING); + // vars + bufferlist addrs; + entity_addr_t socket_addr; + socklen_t len; + int r; + char banner[strlen(CEPH_BANNER)+1]; + bufferlist addrbl; + ceph_msg_connect connect; + ceph_msg_connect_reply reply; + Pipe *existing = 0; + bufferptr bp; + bufferlist authorizer, authorizer_reply; + bool authorizer_valid; + uint64_t feat_missing; + bool replaced = false; + CryptoKey session_key; + + // this should roughly mirror pseudocode at + // http://ceph.newdream.net/wiki/Messaging_protocol + int reply_tag = 0; + uint64_t existing_seq = -1; + + // used for reading in the remote acked seq on connect + uint64_t newly_acked_seq = 0; + // announce myself. - int rc = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER)); - if (rc < 0) { + r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER)); + if (r < 0) { ldout(msgr->cct,10) << "accept couldn't write banner" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } // and my addr - bufferlist addrs; ::encode(msgr->my_inst.addr, addrs); port = msgr->my_inst.addr.get_port(); // and peer's socket addr (they might not know their ip) - entity_addr_t socket_addr; - socklen_t len = sizeof(socket_addr.ss_addr()); - int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len); + len = sizeof(socket_addr.ss_addr()); + r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len); if (r < 0) { char buf[80]; ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } ::encode(socket_addr, addrs); - rc = tcp_write(addrs.c_str(), addrs.length()); - if (rc < 0) { + r = tcp_write(addrs.c_str(), addrs.length()); + if (r < 0) { ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl; // identify peer - char banner[strlen(CEPH_BANNER)+1]; if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) { ldout(msgr->cct,10) << "accept couldn't read banner" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { banner[strlen(CEPH_BANNER)] = 0; ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } - bufferlist addrbl; { bufferptr tp(sizeof(peer_addr)); addrbl.push_back(tp); } if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) { ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } { bufferlist::iterator ti = addrbl.begin(); @@ -296,24 +305,6 @@ int Pipe::accept() } set_peer_addr(peer_addr); // so that connection_state gets set up - ceph_msg_connect connect; - ceph_msg_connect_reply reply; - Pipe *existing = 0; - bufferptr bp; - bufferlist authorizer, authorizer_reply; - bool authorizer_valid; - uint64_t feat_missing; - bool replaced = false; - CryptoKey session_key; - - // this should roughly mirror pseudocode at - // http://ceph.newdream.net/wiki/Messaging_protocol - int reply_tag = 0; - uint64_t existing_seq = -1; - - // used for reading in the remote acked seq on connect - uint64_t newly_acked_seq = 0; - while (1) { if (tcp_read((char*)&connect, sizeof(connect)) < 0) { ldout(msgr->cct,10) << "accept couldn't read connect" << dendl; @@ -544,12 +535,12 @@ int Pipe::accept() reply: reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; reply.authorizer_len = authorizer_reply.length(); - rc = tcp_write((char*)&reply, sizeof(reply)); - if (rc < 0) + r = tcp_write((char*)&reply, sizeof(reply)); + if (r < 0) goto fail_unlocked; if (reply.authorizer_len) { - rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); - if (rc < 0) + r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); + if (r < 0) goto fail_unlocked; } } @@ -564,7 +555,12 @@ int Pipe::accept() existing->unregister_pipe(); replaced = true; - if (!existing->policy.lossy) { + if (existing->policy.lossy) { + // disconnect from the Connection + assert(existing->connection_state); + if (existing->connection_state->clear_pipe(existing)) + msgr->dispatch_queue.queue_reset(existing->connection_state.get()); + } else { // queue a reset on the old connection msgr->dispatch_queue.queue_reset(connection_state.get()); @@ -574,7 +570,7 @@ int Pipe::accept() connection_state = existing->connection_state; // make existing Connection reference us - existing->connection_state->reset_pipe(this); + connection_state->reset_pipe(this); // flush/queue any existing delayed messages if (existing->delay_thread) @@ -631,20 +627,20 @@ int Pipe::accept() register_pipe(); msgr->lock.Unlock(); - rc = tcp_write((char*)&reply, sizeof(reply)); - if (rc < 0) { + r = tcp_write((char*)&reply, sizeof(reply)); + if (r < 0) { goto fail_registered; } if (reply.authorizer_len) { - rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); - if (rc < 0) { + r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); + if (r < 0) { goto fail_registered; } } if (reply_tag == CEPH_MSGR_TAG_SEQ) { - if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) { + if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) { ldout(msgr->cct,2) << "accept write error on in_seq" << dendl; goto fail_registered; } diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index f1e614628df..48e37d87098 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -222,6 +222,13 @@ void SimpleMessenger::reaper() ldout(cct,10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; p->pipe_lock.Lock(); p->discard_out_queue(); + if (p->connection_state) { + // mark_down, mark_down_all, or fault() should have done this, + // or accept() may have switch the Connection to a different + // Pipe... but make sure! + bool cleared = p->connection_state->clear_pipe(p); + assert(!cleared); + } p->pipe_lock.Unlock(); p->unregister_pipe(); assert(pipes.count(p)); @@ -230,8 +237,6 @@ void SimpleMessenger::reaper() if (p->sd >= 0) ::close(p->sd); ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; - if (p->connection_state) - p->connection_state->clear_pipe(p); p->put(); ldout(cct,10) << "reaper deleted pipe " << p << dendl; } @@ -563,9 +568,9 @@ void SimpleMessenger::mark_down_all() p->pipe_lock.Lock(); p->stop(); ConnectionRef con = p->connection_state; - p->pipe_lock.Unlock(); if (con && con->clear_pipe(p)) dispatch_queue.queue_reset(con.get()); + p->pipe_lock.Unlock(); } lock.Unlock(); } @@ -579,6 +584,11 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr) p->unregister_pipe(); p->pipe_lock.Lock(); p->stop(); + if (p->connection_state) { + // do not generate a reset event for the caller in this case, + // since they asked for it. + p->connection_state->clear_pipe(p); + } p->pipe_lock.Unlock(); } else { ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl; @@ -598,6 +608,11 @@ void SimpleMessenger::mark_down(Connection *con) p->unregister_pipe(); p->pipe_lock.Lock(); p->stop(); + if (p->connection_state) { + // do not generate a reset event for the caller in this case, + // since they asked for it. + p->connection_state->clear_pipe(p); + } p->pipe_lock.Unlock(); p->put(); } else { |