summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Farnum <greg@inktank.com>2013-06-23 22:42:07 -0700
committerGregory Farnum <greg@inktank.com>2013-06-23 22:42:07 -0700
commit134d08a9654f66634b893d493e4a92f38acc63cf (patch)
treef1178a3d903bff5aa2e8b84df25064e52acdebf4
parentb89d7420e3501247d6ed282d2253c95c758526b1 (diff)
parent57dc73627eaea0d8f928fd46251accb02e6d8b02 (diff)
downloadceph-134d08a9654f66634b893d493e4a92f38acc63cf.tar.gz
Merge pull request #375 from ceph/wip-msgr
misc msgr fixes Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r--src/msg/Pipe.cc110
-rw-r--r--src/msg/SimpleMessenger.cc21
2 files changed, 71 insertions, 60 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index 4eb3d266937..b581e367332 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -216,69 +216,78 @@ int Pipe::accept()
// my creater gave me sd via accept()
assert(state == STATE_ACCEPTING);
+ // vars
+ bufferlist addrs;
+ entity_addr_t socket_addr;
+ socklen_t len;
+ int r;
+ char banner[strlen(CEPH_BANNER)+1];
+ bufferlist addrbl;
+ ceph_msg_connect connect;
+ ceph_msg_connect_reply reply;
+ Pipe *existing = 0;
+ bufferptr bp;
+ bufferlist authorizer, authorizer_reply;
+ bool authorizer_valid;
+ uint64_t feat_missing;
+ bool replaced = false;
+ CryptoKey session_key;
+
+ // this should roughly mirror pseudocode at
+ // http://ceph.newdream.net/wiki/Messaging_protocol
+ int reply_tag = 0;
+ uint64_t existing_seq = -1;
+
+ // used for reading in the remote acked seq on connect
+ uint64_t newly_acked_seq = 0;
+
// announce myself.
- int rc = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
- if (rc < 0) {
+ r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
+ if (r < 0) {
ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
// and my addr
- bufferlist addrs;
::encode(msgr->my_inst.addr, addrs);
port = msgr->my_inst.addr.get_port();
// and peer's socket addr (they might not know their ip)
- entity_addr_t socket_addr;
- socklen_t len = sizeof(socket_addr.ss_addr());
- int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
+ len = sizeof(socket_addr.ss_addr());
+ r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
if (r < 0) {
char buf[80];
ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
::encode(socket_addr, addrs);
- rc = tcp_write(addrs.c_str(), addrs.length());
- if (rc < 0) {
+ r = tcp_write(addrs.c_str(), addrs.length());
+ if (r < 0) {
ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
// identify peer
- char banner[strlen(CEPH_BANNER)+1];
if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
banner[strlen(CEPH_BANNER)] = 0;
ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
- bufferlist addrbl;
{
bufferptr tp(sizeof(peer_addr));
addrbl.push_back(tp);
}
if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
{
bufferlist::iterator ti = addrbl.begin();
@@ -296,24 +305,6 @@ int Pipe::accept()
}
set_peer_addr(peer_addr); // so that connection_state gets set up
- ceph_msg_connect connect;
- ceph_msg_connect_reply reply;
- Pipe *existing = 0;
- bufferptr bp;
- bufferlist authorizer, authorizer_reply;
- bool authorizer_valid;
- uint64_t feat_missing;
- bool replaced = false;
- CryptoKey session_key;
-
- // this should roughly mirror pseudocode at
- // http://ceph.newdream.net/wiki/Messaging_protocol
- int reply_tag = 0;
- uint64_t existing_seq = -1;
-
- // used for reading in the remote acked seq on connect
- uint64_t newly_acked_seq = 0;
-
while (1) {
if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
@@ -544,12 +535,12 @@ int Pipe::accept()
reply:
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
reply.authorizer_len = authorizer_reply.length();
- rc = tcp_write((char*)&reply, sizeof(reply));
- if (rc < 0)
+ r = tcp_write((char*)&reply, sizeof(reply));
+ if (r < 0)
goto fail_unlocked;
if (reply.authorizer_len) {
- rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
- if (rc < 0)
+ r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+ if (r < 0)
goto fail_unlocked;
}
}
@@ -564,7 +555,12 @@ int Pipe::accept()
existing->unregister_pipe();
replaced = true;
- if (!existing->policy.lossy) {
+ if (existing->policy.lossy) {
+ // disconnect from the Connection
+ assert(existing->connection_state);
+ if (existing->connection_state->clear_pipe(existing))
+ msgr->dispatch_queue.queue_reset(existing->connection_state.get());
+ } else {
// queue a reset on the old connection
msgr->dispatch_queue.queue_reset(connection_state.get());
@@ -574,7 +570,7 @@ int Pipe::accept()
connection_state = existing->connection_state;
// make existing Connection reference us
- existing->connection_state->reset_pipe(this);
+ connection_state->reset_pipe(this);
// flush/queue any existing delayed messages
if (existing->delay_thread)
@@ -631,20 +627,20 @@ int Pipe::accept()
register_pipe();
msgr->lock.Unlock();
- rc = tcp_write((char*)&reply, sizeof(reply));
- if (rc < 0) {
+ r = tcp_write((char*)&reply, sizeof(reply));
+ if (r < 0) {
goto fail_registered;
}
if (reply.authorizer_len) {
- rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
- if (rc < 0) {
+ r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+ if (r < 0) {
goto fail_registered;
}
}
if (reply_tag == CEPH_MSGR_TAG_SEQ) {
- if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
+ if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
goto fail_registered;
}
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index f1e614628df..48e37d87098 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -222,6 +222,13 @@ void SimpleMessenger::reaper()
ldout(cct,10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
p->pipe_lock.Lock();
p->discard_out_queue();
+ if (p->connection_state) {
+ // mark_down, mark_down_all, or fault() should have done this,
+ // or accept() may have switch the Connection to a different
+ // Pipe... but make sure!
+ bool cleared = p->connection_state->clear_pipe(p);
+ assert(!cleared);
+ }
p->pipe_lock.Unlock();
p->unregister_pipe();
assert(pipes.count(p));
@@ -230,8 +237,6 @@ void SimpleMessenger::reaper()
if (p->sd >= 0)
::close(p->sd);
ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
- if (p->connection_state)
- p->connection_state->clear_pipe(p);
p->put();
ldout(cct,10) << "reaper deleted pipe " << p << dendl;
}
@@ -563,9 +568,9 @@ void SimpleMessenger::mark_down_all()
p->pipe_lock.Lock();
p->stop();
ConnectionRef con = p->connection_state;
- p->pipe_lock.Unlock();
if (con && con->clear_pipe(p))
dispatch_queue.queue_reset(con.get());
+ p->pipe_lock.Unlock();
}
lock.Unlock();
}
@@ -579,6 +584,11 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr)
p->unregister_pipe();
p->pipe_lock.Lock();
p->stop();
+ if (p->connection_state) {
+ // do not generate a reset event for the caller in this case,
+ // since they asked for it.
+ p->connection_state->clear_pipe(p);
+ }
p->pipe_lock.Unlock();
} else {
ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl;
@@ -598,6 +608,11 @@ void SimpleMessenger::mark_down(Connection *con)
p->unregister_pipe();
p->pipe_lock.Lock();
p->stop();
+ if (p->connection_state) {
+ // do not generate a reset event for the caller in this case,
+ // since they asked for it.
+ p->connection_state->clear_pipe(p);
+ }
p->pipe_lock.Unlock();
p->put();
} else {