summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-06-17 14:14:02 -0700
committerSage Weil <sage@inktank.com>2013-06-23 15:10:24 -0700
commitec612a5bda119cea52bbac9b2a49ecf1e83b08e5 (patch)
tree1740c87b7c0b008822b8e0bcab3d69550c609ad2
parentafafb87e8402242d3897069f4b94ba46ffe0c413 (diff)
downloadceph-ec612a5bda119cea52bbac9b2a49ecf1e83b08e5.tar.gz
msg/Pipe: goto fail_unlocked on early failures in accept()
Instead of duplicating an incomplete cleanup sequence (that does not clear_pipe()), goto fail_unlocked and do the cleanup in a generic way. s/rc/r/ while we are here. Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/msg/Pipe.cc101
1 files changed, 46 insertions, 55 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index 4eb3d266937..e425bc1bb2d 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -216,69 +216,78 @@ int Pipe::accept()
// my creater gave me sd via accept()
assert(state == STATE_ACCEPTING);
+ // vars
+ bufferlist addrs;
+ entity_addr_t socket_addr;
+ socklen_t len;
+ int r;
+ char banner[strlen(CEPH_BANNER)+1];
+ bufferlist addrbl;
+ ceph_msg_connect connect;
+ ceph_msg_connect_reply reply;
+ Pipe *existing = 0;
+ bufferptr bp;
+ bufferlist authorizer, authorizer_reply;
+ bool authorizer_valid;
+ uint64_t feat_missing;
+ bool replaced = false;
+ CryptoKey session_key;
+
+ // this should roughly mirror pseudocode at
+ // http://ceph.newdream.net/wiki/Messaging_protocol
+ int reply_tag = 0;
+ uint64_t existing_seq = -1;
+
+ // used for reading in the remote acked seq on connect
+ uint64_t newly_acked_seq = 0;
+
// announce myself.
- int rc = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
- if (rc < 0) {
+ r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
+ if (r < 0) {
ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
// and my addr
- bufferlist addrs;
::encode(msgr->my_inst.addr, addrs);
port = msgr->my_inst.addr.get_port();
// and peer's socket addr (they might not know their ip)
- entity_addr_t socket_addr;
- socklen_t len = sizeof(socket_addr.ss_addr());
- int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
+ len = sizeof(socket_addr.ss_addr());
+ r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
if (r < 0) {
char buf[80];
ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
::encode(socket_addr, addrs);
- rc = tcp_write(addrs.c_str(), addrs.length());
- if (rc < 0) {
+ r = tcp_write(addrs.c_str(), addrs.length());
+ if (r < 0) {
ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
// identify peer
- char banner[strlen(CEPH_BANNER)+1];
if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
banner[strlen(CEPH_BANNER)] = 0;
ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
- bufferlist addrbl;
{
bufferptr tp(sizeof(peer_addr));
addrbl.push_back(tp);
}
if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
{
bufferlist::iterator ti = addrbl.begin();
@@ -296,24 +305,6 @@ int Pipe::accept()
}
set_peer_addr(peer_addr); // so that connection_state gets set up
- ceph_msg_connect connect;
- ceph_msg_connect_reply reply;
- Pipe *existing = 0;
- bufferptr bp;
- bufferlist authorizer, authorizer_reply;
- bool authorizer_valid;
- uint64_t feat_missing;
- bool replaced = false;
- CryptoKey session_key;
-
- // this should roughly mirror pseudocode at
- // http://ceph.newdream.net/wiki/Messaging_protocol
- int reply_tag = 0;
- uint64_t existing_seq = -1;
-
- // used for reading in the remote acked seq on connect
- uint64_t newly_acked_seq = 0;
-
while (1) {
if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
@@ -544,12 +535,12 @@ int Pipe::accept()
reply:
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
reply.authorizer_len = authorizer_reply.length();
- rc = tcp_write((char*)&reply, sizeof(reply));
- if (rc < 0)
+ r = tcp_write((char*)&reply, sizeof(reply));
+ if (r < 0)
goto fail_unlocked;
if (reply.authorizer_len) {
- rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
- if (rc < 0)
+ r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+ if (r < 0)
goto fail_unlocked;
}
}
@@ -631,20 +622,20 @@ int Pipe::accept()
register_pipe();
msgr->lock.Unlock();
- rc = tcp_write((char*)&reply, sizeof(reply));
- if (rc < 0) {
+ r = tcp_write((char*)&reply, sizeof(reply));
+ if (r < 0) {
goto fail_registered;
}
if (reply.authorizer_len) {
- rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
- if (rc < 0) {
+ r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+ if (r < 0) {
goto fail_registered;
}
}
if (reply_tag == CEPH_MSGR_TAG_SEQ) {
- if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
+ if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
goto fail_registered;
}