diff options
author | Sage Weil <sage@newdream.net> | 2012-04-03 13:44:29 -0700 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2012-04-03 13:44:29 -0700 |
commit | 1ef37ab8cbbba23aa169ba55ee43ff6354e55e09 (patch) | |
tree | 33a58276825de27b40072eaa29071e394c51fe8f | |
parent | a31efd9c99a6319e672be73c39fae7804cba3f5f (diff) | |
parent | 756621d50132f0d619ebf31919e6ac1b89d30eb4 (diff) | |
download | ceph-1ef37ab8cbbba23aa169ba55ee43ff6354e55e09.tar.gz |
Merge remote-tracking branch 'gh/msgr-api-changes'
Reviewed-by: Sage Weil <sage@newdream.net>
-rw-r--r-- | src/common/buffer.cc | 15 | ||||
-rw-r--r-- | src/include/buffer.h | 1 | ||||
-rw-r--r-- | src/msg/Dispatcher.h | 14 | ||||
-rw-r--r-- | src/msg/Messenger.h | 54 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.cc | 126 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 39 | ||||
-rw-r--r-- | src/msg/tcp.cc | 30 | ||||
-rw-r--r-- | src/osd/OSD.cc | 4 |
8 files changed, 160 insertions, 123 deletions
diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 893d8a7382f..a6ad8c0d736 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -611,6 +611,21 @@ bool buffer_track_alloc = get_env_bool("CEPH_BUFFER_TRACK"); other.last_p = other.begin(); } + bool buffer::list::contents_equal(ceph::buffer::list& other) + { + if (length() != other.length()) + return false; + bufferlist::iterator me = begin(); + bufferlist::iterator him = other.begin(); + while (!me.end()) { + if (*me != *him) + return false; + ++me; + ++him; + } + return true; + } + bool buffer::list::is_page_aligned() const { for (std::list<ptr>::const_iterator it = _buffers.begin(); diff --git a/src/include/buffer.h b/src/include/buffer.h index 12dec986ea9..65c101f6f96 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -325,6 +325,7 @@ public: #endif return _len; } + bool contents_equal(buffer::list& other); bool is_page_aligned() const; bool is_n_page_sized() const; diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 16cd03d36c8..d2c9db85531 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -37,14 +37,20 @@ public: virtual void ms_handle_connect(Connection *con) { }; /* - * on any connection reset. * this indicates that the ordered+reliable delivery semantics have - * been violated. messages may have been lost. + * been violated. Messages may have been lost due to a fault + * in the network connection. + * Only called on lossy Connections or those you've + * designated mark_down_on_empty(). */ virtual bool ms_handle_reset(Connection *con) = 0; - // on deliberate reset of connection by remote - // implies incoming messages dropped; possibly/probably some of our previous outgoing too. + /** + * This indicates that the ordered+reliable delivery semantics + * have been violated because the remote somehow reset. + * It implies that incoming messages were dropped, and + * probably some of our previous outgoing messages were too. + */ virtual void ms_handle_remote_reset(Connection *con) = 0; // authorization handshake provides mutual authentication of peers. diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 61e35e1bff2..730fa1a68ec 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -74,7 +74,7 @@ private: protected: /// the "name" of the local daemon. eg client.99 - entity_name_t _my_name; + entity_inst_t my_inst; int default_send_priority; /// set to true once the Messenger has started, and set to false on shutdown bool started; @@ -82,10 +82,11 @@ protected: public: CephContext *cct; Messenger(CephContext *cct_, entity_name_t w) - : default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false), + : my_inst(), + default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false), cct(cct_) { - _my_name = w; + my_inst.name = w; } virtual ~Messenger() {} @@ -93,10 +94,25 @@ protected: } // accessors - entity_name_t get_myname() { return _my_name; } - virtual entity_addr_t get_myaddr() = 0; - virtual void set_ip(entity_addr_t &addr) = 0; - entity_inst_t get_myinst() { return entity_inst_t(get_myname(), get_myaddr()); } + const entity_name_t& get_myname() { return my_inst.name; } + /** + * Retrieve the Messenger's address. + * + * @return A copy of the address this Messenger currently + * believes to be its own. + */ + const entity_addr_t& get_myaddr() { return my_inst.addr; } + /** + * Set the unknown address components for this Messenger. + * This is useful if the Messenger doesn't know its full address just by + * binding, but another Messenger on the same interface has already learned + * its full address. This function does not fill in known address elements, + * cause a rebind, or do anything of that sort. + * + * @param addr The address to use as a template. + */ + virtual void set_addr_unknowns(entity_addr_t &addr) = 0; + const entity_inst_t& get_myinst() { return my_inst; } /** * Set the name of the local entity. The name is reported to others and @@ -105,7 +121,7 @@ protected: * * @param m The name to set. */ - void set_myname(const entity_name_t m) { _my_name = m; } + void set_myname(const entity_name_t m) { my_inst.name = m; } /** * Set the default send priority @@ -244,13 +260,33 @@ protected: virtual int lazy_send_message(Message *m, Connection *con) = 0; virtual int send_keepalive(const entity_inst_t& dest) = 0; virtual int send_keepalive(Connection *con) = 0; - + /** + * Mark down a connection to a remote. This will cause us to + * discard our outgoing queue for them, and if they try + * to reconnect they will discard their queue when we + * inform them of the session reset. + * It does not generate any notifications to he Dispatcher. + */ virtual void mark_down(const entity_addr_t& a) = 0; virtual void mark_down(Connection *con) = 0; + /** + * Unlike mark_down, this function will try and deliver + * all messages before ending the connection. But the + * messages are not delivered reliably, and once they've + * all been sent out the Connection will be closed and + * generate an ms_handle_reset notification to the + * Dispatcher. + */ virtual void mark_down_on_empty(Connection *con) = 0; virtual void mark_disposable(Connection *con) = 0; virtual void mark_down_all() = 0; + /** + * Get the Connection object associated with a given entity. If a + * Connection does not exist, create one and establish a logical connection. + * + * @param dest The entity to get a connection for. + */ virtual Connection *get_connection(const entity_inst_t& dest) = 0; virtual int rebind(int avoid_port) { return -EOPNOTSUPP; } diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index a1e1cc4edc7..fd930318e7b 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -46,7 +46,7 @@ #undef dout_prefix #define dout_prefix _prefix(_dout, msgr) static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) { - return *_dout << "-- " << msgr->ms_addr << " "; + return *_dout << "-- " << msgr->get_myaddr() << " "; } @@ -145,20 +145,20 @@ int SimpleMessenger::Accepter::bind(entity_addr_t &bind_addr, int avoid_port1, i return -errno; } - msgr->ms_addr = bind_addr; - if (msgr->ms_addr != entity_addr_t()) + msgr->my_inst.addr = bind_addr; + if (msgr->my_inst.addr != entity_addr_t()) msgr->need_addr = false; else msgr->need_addr = true; - if (msgr->ms_addr.get_port() == 0) { - msgr->ms_addr = listen_addr; - msgr->ms_addr.nonce = msgr->nonce; + if (msgr->my_inst.addr.get_port() == 0) { + msgr->my_inst.addr = listen_addr; + msgr->my_inst.addr.nonce = msgr->nonce; } msgr->init_local_pipe(); - ldout(msgr->cct,1) << "accepter.bind ms_addr is " << msgr->ms_addr << " need_addr=" << msgr->need_addr << dendl; + ldout(msgr->cct,1) << "accepter.bind my_inst.addr is " << msgr->my_inst.addr << " need_addr=" << msgr->need_addr << dendl; msgr->did_bind = true; return 0; } @@ -170,7 +170,7 @@ int SimpleMessenger::Accepter::rebind(int avoid_port) stop(); - entity_addr_t addr = msgr->ms_addr; + entity_addr_t addr = msgr->my_inst.addr; int old_port = addr.get_port(); addr.set_port(0); @@ -487,22 +487,16 @@ int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest) return 0; } -entity_addr_t SimpleMessenger::get_myaddr() -{ - entity_addr_t a = ms_addr; - return a; -} - /** - * If ms_addr doesn't have an IP set, this function + * If my_inst.addr doesn't have an IP set, this function * will fill it in from the passed addr. Otherwise it does nothing and returns. */ -void SimpleMessenger::set_ip(entity_addr_t &addr) +void SimpleMessenger::set_addr_unknowns(entity_addr_t &addr) { - if (ms_addr.is_blank_ip()) { - int port = ms_addr.get_port(); - ms_addr.addr = addr.addr; - ms_addr.set_port(port); + if (my_inst.addr.is_blank_ip()) { + int port = my_inst.addr.get_port(); + my_inst.addr.addr = addr.addr; + my_inst.addr.set_port(port); } } @@ -539,7 +533,7 @@ int SimpleMessenger::get_proto_version(int peer_type, bool connect) #undef dout_prefix #define dout_prefix _pipe_prefix(_dout) ostream& SimpleMessenger::Pipe::_pipe_prefix(std::ostream *_dout) { - return *_dout << "-- " << msgr->ms_addr << " >> " << peer_addr << " pipe(" << this + return *_dout << "-- " << msgr->my_inst.addr << " >> " << peer_addr << " pipe(" << this << " sd=" << sd << " pgs=" << peer_global_seq << " cs=" << connect_seq @@ -626,7 +620,7 @@ int SimpleMessenger::Pipe::accept() // and my addr bufferlist addrs; - ::encode(msgr->ms_addr, addrs); + ::encode(msgr->my_inst.addr, addrs); // and peer's socket addr (they might not know their ip) entity_addr_t socket_addr; @@ -830,7 +824,7 @@ int SimpleMessenger::Pipe::accept() if (connect.connect_seq == existing->connect_seq) { // connection race? - if (peer_addr < msgr->ms_addr || + if (peer_addr < msgr->my_inst.addr || existing->policy.server) { // incoming wins ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq @@ -850,7 +844,7 @@ int SimpleMessenger::Pipe::accept() // our existing outgoing wins ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; - assert(peer_addr > msgr->ms_addr); + assert(peer_addr > msgr->my_inst.addr); if (!(existing->state == STATE_CONNECTING || existing->state == STATE_OPEN)) lderr(msgr->cct) << "accept race bad state, would send wait, existing=" << existing->state @@ -1099,7 +1093,7 @@ int SimpleMessenger::Pipe::connect() msg.msg_iov = msgvec; msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; - if (do_sendmsg(sd, &msg, msglen)) { + if (do_sendmsg(&msg, msglen)) { ldout(msgr->cct,2) << "connect couldn't write my banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } @@ -1139,7 +1133,7 @@ int SimpleMessenger::Pipe::connect() if (msgr->need_addr) msgr->learned_addr(peer_addr_for_me); - ::encode(msgr->ms_addr, myaddrbl); + ::encode(msgr->my_inst.addr, myaddrbl); memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = myaddrbl.c_str(); @@ -1147,11 +1141,11 @@ int SimpleMessenger::Pipe::connect() msg.msg_iov = msgvec; msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; - if (do_sendmsg(sd, &msg, msglen)) { + if (do_sendmsg(&msg, msglen)) { ldout(msgr->cct,2) << "connect couldn't write my addr, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } - ldout(msgr->cct,10) << "connect sent my addr " << msgr->ms_addr << dendl; + ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl; while (1) { @@ -1188,7 +1182,7 @@ int SimpleMessenger::Pipe::connect() ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq << " proto=" << connect.protocol_version << dendl; - if (do_sendmsg(sd, &msg, msglen)) { + if (do_sendmsg(&msg, msglen)) { ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } @@ -2060,7 +2054,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) return ret; } -int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len, bool more) +int SimpleMessenger::Pipe::do_sendmsg(struct msghdr *msg, int len, bool more) { char buf[80]; @@ -2085,40 +2079,6 @@ int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len, bool return -1; // close enough } - if (0) { - // hex dump - struct iovec *v = msg->msg_iov; - size_t left = r; - size_t vpos = 0; - ldout(msgr->cct,0) << "do_sendmsg wrote " << r << " bytes, hexdump:\n"; - int pos = 0; - int col = 0; - char buf[20]; - while (left > 0) { - if (col == 0) { - snprintf(buf, sizeof(buf), "%05x : ", pos); - *_dout << buf; - } - snprintf(buf, sizeof(buf), " %02x", ((unsigned char*)v->iov_base)[vpos]); - *_dout << buf; - left--; - if (!left) - break; - vpos++; - pos++; - if (vpos == v->iov_len) { - v++; - vpos = 0; - } - col++; - if (col == 16) { - *_dout << "\n"; - col = 0; - } - } - *_dout << dendl; - } - len -= r; if (len == 0) break; @@ -2162,7 +2122,7 @@ int SimpleMessenger::Pipe::write_ack(uint64_t seq) msg.msg_iov = msgvec; msg.msg_iovlen = 2; - if (do_sendmsg(sd, &msg, 1 + sizeof(s), true) < 0) + if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0) return -1; return 0; } @@ -2181,7 +2141,7 @@ int SimpleMessenger::Pipe::write_keepalive() msg.msg_iov = msgvec; msg.msg_iovlen = 1; - if (do_sendmsg(sd, &msg, 1) < 0) + if (do_sendmsg(&msg, 1) < 0) return -1; return 0; } @@ -2261,7 +2221,7 @@ int SimpleMessenger::Pipe::write_message(Message *m) << dendl; if (msg.msg_iovlen >= IOV_MAX-2) { - if (do_sendmsg(sd, &msg, msglen, true)) + if (do_sendmsg(&msg, msglen, true)) goto fail; // and restart the iov @@ -2295,7 +2255,7 @@ int SimpleMessenger::Pipe::write_message(Message *m) msg.msg_iovlen++; // send - if (do_sendmsg(sd, &msg, msglen)) + if (do_sendmsg(&msg, msglen)) goto fail; ret = 0; @@ -2413,7 +2373,7 @@ int SimpleMessenger::start() started = true; if (!did_bind) - ms_addr.nonce = nonce; + my_inst.addr.nonce = nonce; lock.Unlock(); @@ -2432,7 +2392,7 @@ int SimpleMessenger::start() SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, int type) { assert(lock.is_locked()); - assert(addr != ms_addr); + assert(addr != my_inst.addr); ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; @@ -2494,7 +2454,7 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest) { Mutex::Locker l(lock); Pipe *pipe = NULL; - if (ms_addr == dest.addr) { + if (my_inst.addr == dest.addr) { // local pipe = dispatch_queue.local_pipe; } else { @@ -2512,16 +2472,10 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest) } } if (!pipe) { - Policy& policy = get_policy(dest.name.type()); - if (policy.lossy && policy.server) - pipe = NULL; - else - pipe = connect_rank(dest.addr, dest.name.type()); + pipe = connect_rank(dest.addr, dest.name.type()); } } - if (pipe) - return (Connection *)pipe->connection_state->get(); - return NULL; + return (Connection *)pipe->connection_state->get(); } @@ -2541,7 +2495,7 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, lock.Lock(); { // local? - if (ms_addr == dest_addr) { + if (my_inst.addr == dest_addr) { if (!destination_stopped) { // local ldout(cct,20) << "submit_message " << *m << " local" << dendl; @@ -2599,7 +2553,7 @@ int SimpleMessenger::send_keepalive(const entity_inst_t& dest) lock.Lock(); { // local? - if (ms_addr != dest_addr) { + if (my_inst.addr != dest_addr) { // remote. Pipe *pipe = 0; if (rank_pipe.count( dest_proc_addr )) { @@ -2808,13 +2762,13 @@ void SimpleMessenger::mark_disposable(Connection *con) void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { // be careful here: multiple threads may block here, and readers of - // ms_addr do NOT hold any lock. + // my_inst.addr do NOT hold any lock. lock.Lock(); if (need_addr) { entity_addr_t t = peer_addr_for_me; - t.set_port(ms_addr.get_port()); - ms_addr.addr = t.addr; - ldout(cct,1) << "learned my addr " << ms_addr << dendl; + t.set_port(my_inst.addr.get_port()); + my_inst.addr.addr = t.addr; + ldout(cct,1) << "learned my addr " << my_inst.addr << dendl; need_addr = false; init_local_pipe(); } @@ -2823,6 +2777,6 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) void SimpleMessenger::init_local_pipe() { - dispatch_queue.local_pipe->connection_state->peer_addr = msgr->ms_addr; + dispatch_queue.local_pipe->connection_state->peer_addr = msgr->my_inst.addr; dispatch_queue.local_pipe->connection_state->peer_type = msgr->my_type; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 8ab78c9fc1a..ab31789dc65 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -65,14 +65,7 @@ public: * * @param addr The IP address to set internally. */ - void set_ip(entity_addr_t& addr); - /** - * Retrieve the Messenger's address. - * - * @return A copy of he address this Messenger currently - * believes to be its own. - */ - virtual entity_addr_t get_myaddr(); + void set_addr_unknowns(entity_addr_t& addr); /** * Retrieve the Connection for an endpoint. * @@ -212,7 +205,18 @@ private: int read_message(Message **pm); int write_message(Message *m); - int do_sendmsg(int sd, struct msghdr *msg, int len, bool more=false); + /** + * Write the given data (of length len) to the Pipe's socket. This function + * will loop until all passed data has been written out. + * If more is set, the function will optimize socket writes + * for additional data (by passing the MSG_MORE flag, aka TCP_CORK). + * + * @param msg The msghdr to write out + * @param len The length of the data in msg + * @param more Should be set true if this is one part of a larger message + * @return 0, or -1 on failure (unrecoverable -- close the socket). + */ + int do_sendmsg(struct msghdr *msg, int len, bool more=false); int write_ack(uint64_t s); int write_keepalive(); @@ -467,7 +471,6 @@ private: void dispatch_throttle_release(uint64_t msize); // SimpleMessenger stuff - public: Mutex lock; Cond wait_cond; // for wait() bool did_bind; @@ -475,7 +478,6 @@ private: // where i listen bool need_addr; - entity_addr_t ms_addr; uint64_t nonce; // local @@ -491,6 +493,13 @@ private: Policy default_policy; map<int, Policy> policy_map; // entity_name_t::type -> Policy + // --- pipes --- + set<Pipe*> pipes; + list<Pipe*> pipe_reap_queue; + + Mutex global_seq_lock; + __u32 global_seq; +public: Policy& get_policy(int t) { if (policy_map.count(t)) return policy_map[t]; @@ -498,15 +507,7 @@ private: return default_policy; } - // --- pipes --- - set<Pipe*> pipes; - list<Pipe*> pipe_reap_queue; - - Mutex global_seq_lock; - __u32 global_seq; - Pipe *connect_rank(const entity_addr_t& addr, int type); - virtual void mark_down(const entity_addr_t& addr); virtual void mark_down(Connection *con); virtual void mark_down_on_empty(Connection *con); diff --git a/src/msg/tcp.cc b/src/msg/tcp.cc index 1c0abb1502e..3e2713b91a3 100644 --- a/src/msg/tcp.cc +++ b/src/msg/tcp.cc @@ -12,6 +12,13 @@ /****************** * tcp crap + * + * These functions only propagate unrecoverable errors -- if you see an error, + * close the socket. You can't know what has and hasn't been written out. + */ +/** + * Read the specified amount of data, in a blocking fashion. + * If there is an error, return -1 (unrecoverable). */ int tcp_read(CephContext *cct, int sd, char *buf, int len, int timeout) { @@ -42,6 +49,13 @@ int tcp_read(CephContext *cct, int sd, char *buf, int len, int timeout) return len; } +/** + * Wait for data to become available for reading on the given socket. You + * can specify a timeout in milliseconds, or -1 to wait forever. + * + * @return 0 when data is available, or -1 if there + * is an error (unrecoverable). + */ int tcp_read_wait(int sd, int timeout) { if (sd < 0) @@ -70,9 +84,13 @@ int tcp_read_wait(int sd, int timeout) return 0; } -/* This function can only be called if poll/select says there is - * data available. Otherwise we cannot properly interpret a - * read of 0 bytes. +/** + * Read data off of a given socket. + * + * This function can only be called if poll/select says there is data + * available -- otherwise we can't properly interpret a read of 0 bytes. + * + * @return The number of bytes read, or -1 on error (unrecoverable). */ int tcp_read_nonblocking(CephContext *cct, int sd, char *buf, int len) { @@ -96,6 +114,12 @@ again: return got; } +/** + * Write the given data to the given socket. This function will loop until + * all passed data has been written out. + * + * @return 0, or -1 on error (unrecoverable). + */ int tcp_write(CephContext *cct, int sd, const char *buf, int len) { if (sd < 0) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 5a2c8cd6a33..787d4f7391c 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -2008,7 +2008,7 @@ void OSD::send_boot() int port = cluster_addr.get_port(); cluster_addr = client_messenger->get_myaddr(); cluster_addr.set_port(port); - cluster_messenger->set_ip(cluster_addr); + cluster_messenger->set_addr_unknowns(cluster_addr); dout(10) << " assuming cluster_addr ip matches client_addr" << dendl; } entity_addr_t hb_addr = hbserver_messenger->get_myaddr(); @@ -2016,7 +2016,7 @@ void OSD::send_boot() int port = hb_addr.get_port(); hb_addr = cluster_addr; hb_addr.set_port(port); - hbserver_messenger->set_ip(hb_addr); + hbserver_messenger->set_addr_unknowns(hb_addr); dout(10) << " assuming hb_addr ip matches cluster_addr" << dendl; } MOSDBoot *mboot = new MOSDBoot(superblock, hb_addr, cluster_addr); |