summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2012-04-03 13:44:29 -0700
committerSage Weil <sage@newdream.net>2012-04-03 13:44:29 -0700
commit1ef37ab8cbbba23aa169ba55ee43ff6354e55e09 (patch)
tree33a58276825de27b40072eaa29071e394c51fe8f
parenta31efd9c99a6319e672be73c39fae7804cba3f5f (diff)
parent756621d50132f0d619ebf31919e6ac1b89d30eb4 (diff)
downloadceph-1ef37ab8cbbba23aa169ba55ee43ff6354e55e09.tar.gz
Merge remote-tracking branch 'gh/msgr-api-changes'
Reviewed-by: Sage Weil <sage@newdream.net>
-rw-r--r--src/common/buffer.cc15
-rw-r--r--src/include/buffer.h1
-rw-r--r--src/msg/Dispatcher.h14
-rw-r--r--src/msg/Messenger.h54
-rw-r--r--src/msg/SimpleMessenger.cc126
-rw-r--r--src/msg/SimpleMessenger.h39
-rw-r--r--src/msg/tcp.cc30
-rw-r--r--src/osd/OSD.cc4
8 files changed, 160 insertions, 123 deletions
diff --git a/src/common/buffer.cc b/src/common/buffer.cc
index 893d8a7382f..a6ad8c0d736 100644
--- a/src/common/buffer.cc
+++ b/src/common/buffer.cc
@@ -611,6 +611,21 @@ bool buffer_track_alloc = get_env_bool("CEPH_BUFFER_TRACK");
other.last_p = other.begin();
}
+ bool buffer::list::contents_equal(ceph::buffer::list& other)
+ {
+ if (length() != other.length())
+ return false;
+ bufferlist::iterator me = begin();
+ bufferlist::iterator him = other.begin();
+ while (!me.end()) {
+ if (*me != *him)
+ return false;
+ ++me;
+ ++him;
+ }
+ return true;
+ }
+
bool buffer::list::is_page_aligned() const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
diff --git a/src/include/buffer.h b/src/include/buffer.h
index 12dec986ea9..65c101f6f96 100644
--- a/src/include/buffer.h
+++ b/src/include/buffer.h
@@ -325,6 +325,7 @@ public:
#endif
return _len;
}
+ bool contents_equal(buffer::list& other);
bool is_page_aligned() const;
bool is_n_page_sized() const;
diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h
index 16cd03d36c8..d2c9db85531 100644
--- a/src/msg/Dispatcher.h
+++ b/src/msg/Dispatcher.h
@@ -37,14 +37,20 @@ public:
virtual void ms_handle_connect(Connection *con) { };
/*
- * on any connection reset.
* this indicates that the ordered+reliable delivery semantics have
- * been violated. messages may have been lost.
+ * been violated. Messages may have been lost due to a fault
+ * in the network connection.
+ * Only called on lossy Connections or those you've
+ * designated mark_down_on_empty().
*/
virtual bool ms_handle_reset(Connection *con) = 0;
- // on deliberate reset of connection by remote
- // implies incoming messages dropped; possibly/probably some of our previous outgoing too.
+ /**
+ * This indicates that the ordered+reliable delivery semantics
+ * have been violated because the remote somehow reset.
+ * It implies that incoming messages were dropped, and
+ * probably some of our previous outgoing messages were too.
+ */
virtual void ms_handle_remote_reset(Connection *con) = 0;
// authorization handshake provides mutual authentication of peers.
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index 61e35e1bff2..730fa1a68ec 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -74,7 +74,7 @@ private:
protected:
/// the "name" of the local daemon. eg client.99
- entity_name_t _my_name;
+ entity_inst_t my_inst;
int default_send_priority;
/// set to true once the Messenger has started, and set to false on shutdown
bool started;
@@ -82,10 +82,11 @@ protected:
public:
CephContext *cct;
Messenger(CephContext *cct_, entity_name_t w)
- : default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
+ : my_inst(),
+ default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
cct(cct_)
{
- _my_name = w;
+ my_inst.name = w;
}
virtual ~Messenger() {}
@@ -93,10 +94,25 @@ protected:
}
// accessors
- entity_name_t get_myname() { return _my_name; }
- virtual entity_addr_t get_myaddr() = 0;
- virtual void set_ip(entity_addr_t &addr) = 0;
- entity_inst_t get_myinst() { return entity_inst_t(get_myname(), get_myaddr()); }
+ const entity_name_t& get_myname() { return my_inst.name; }
+ /**
+ * Retrieve the Messenger's address.
+ *
+ * @return A copy of the address this Messenger currently
+ * believes to be its own.
+ */
+ const entity_addr_t& get_myaddr() { return my_inst.addr; }
+ /**
+ * Set the unknown address components for this Messenger.
+ * This is useful if the Messenger doesn't know its full address just by
+ * binding, but another Messenger on the same interface has already learned
+ * its full address. This function does not fill in known address elements,
+ * cause a rebind, or do anything of that sort.
+ *
+ * @param addr The address to use as a template.
+ */
+ virtual void set_addr_unknowns(entity_addr_t &addr) = 0;
+ const entity_inst_t& get_myinst() { return my_inst; }
/**
* Set the name of the local entity. The name is reported to others and
@@ -105,7 +121,7 @@ protected:
*
* @param m The name to set.
*/
- void set_myname(const entity_name_t m) { _my_name = m; }
+ void set_myname(const entity_name_t m) { my_inst.name = m; }
/**
* Set the default send priority
@@ -244,13 +260,33 @@ protected:
virtual int lazy_send_message(Message *m, Connection *con) = 0;
virtual int send_keepalive(const entity_inst_t& dest) = 0;
virtual int send_keepalive(Connection *con) = 0;
-
+ /**
+ * Mark down a connection to a remote. This will cause us to
+ * discard our outgoing queue for them, and if they try
+ * to reconnect they will discard their queue when we
+ * inform them of the session reset.
+ * It does not generate any notifications to he Dispatcher.
+ */
virtual void mark_down(const entity_addr_t& a) = 0;
virtual void mark_down(Connection *con) = 0;
+ /**
+ * Unlike mark_down, this function will try and deliver
+ * all messages before ending the connection. But the
+ * messages are not delivered reliably, and once they've
+ * all been sent out the Connection will be closed and
+ * generate an ms_handle_reset notification to the
+ * Dispatcher.
+ */
virtual void mark_down_on_empty(Connection *con) = 0;
virtual void mark_disposable(Connection *con) = 0;
virtual void mark_down_all() = 0;
+ /**
+ * Get the Connection object associated with a given entity. If a
+ * Connection does not exist, create one and establish a logical connection.
+ *
+ * @param dest The entity to get a connection for.
+ */
virtual Connection *get_connection(const entity_inst_t& dest) = 0;
virtual int rebind(int avoid_port) { return -EOPNOTSUPP; }
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index a1e1cc4edc7..fd930318e7b 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -46,7 +46,7 @@
#undef dout_prefix
#define dout_prefix _prefix(_dout, msgr)
static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
- return *_dout << "-- " << msgr->ms_addr << " ";
+ return *_dout << "-- " << msgr->get_myaddr() << " ";
}
@@ -145,20 +145,20 @@ int SimpleMessenger::Accepter::bind(entity_addr_t &bind_addr, int avoid_port1, i
return -errno;
}
- msgr->ms_addr = bind_addr;
- if (msgr->ms_addr != entity_addr_t())
+ msgr->my_inst.addr = bind_addr;
+ if (msgr->my_inst.addr != entity_addr_t())
msgr->need_addr = false;
else
msgr->need_addr = true;
- if (msgr->ms_addr.get_port() == 0) {
- msgr->ms_addr = listen_addr;
- msgr->ms_addr.nonce = msgr->nonce;
+ if (msgr->my_inst.addr.get_port() == 0) {
+ msgr->my_inst.addr = listen_addr;
+ msgr->my_inst.addr.nonce = msgr->nonce;
}
msgr->init_local_pipe();
- ldout(msgr->cct,1) << "accepter.bind ms_addr is " << msgr->ms_addr << " need_addr=" << msgr->need_addr << dendl;
+ ldout(msgr->cct,1) << "accepter.bind my_inst.addr is " << msgr->my_inst.addr << " need_addr=" << msgr->need_addr << dendl;
msgr->did_bind = true;
return 0;
}
@@ -170,7 +170,7 @@ int SimpleMessenger::Accepter::rebind(int avoid_port)
stop();
- entity_addr_t addr = msgr->ms_addr;
+ entity_addr_t addr = msgr->my_inst.addr;
int old_port = addr.get_port();
addr.set_port(0);
@@ -487,22 +487,16 @@ int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest)
return 0;
}
-entity_addr_t SimpleMessenger::get_myaddr()
-{
- entity_addr_t a = ms_addr;
- return a;
-}
-
/**
- * If ms_addr doesn't have an IP set, this function
+ * If my_inst.addr doesn't have an IP set, this function
* will fill it in from the passed addr. Otherwise it does nothing and returns.
*/
-void SimpleMessenger::set_ip(entity_addr_t &addr)
+void SimpleMessenger::set_addr_unknowns(entity_addr_t &addr)
{
- if (ms_addr.is_blank_ip()) {
- int port = ms_addr.get_port();
- ms_addr.addr = addr.addr;
- ms_addr.set_port(port);
+ if (my_inst.addr.is_blank_ip()) {
+ int port = my_inst.addr.get_port();
+ my_inst.addr.addr = addr.addr;
+ my_inst.addr.set_port(port);
}
}
@@ -539,7 +533,7 @@ int SimpleMessenger::get_proto_version(int peer_type, bool connect)
#undef dout_prefix
#define dout_prefix _pipe_prefix(_dout)
ostream& SimpleMessenger::Pipe::_pipe_prefix(std::ostream *_dout) {
- return *_dout << "-- " << msgr->ms_addr << " >> " << peer_addr << " pipe(" << this
+ return *_dout << "-- " << msgr->my_inst.addr << " >> " << peer_addr << " pipe(" << this
<< " sd=" << sd
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
@@ -626,7 +620,7 @@ int SimpleMessenger::Pipe::accept()
// and my addr
bufferlist addrs;
- ::encode(msgr->ms_addr, addrs);
+ ::encode(msgr->my_inst.addr, addrs);
// and peer's socket addr (they might not know their ip)
entity_addr_t socket_addr;
@@ -830,7 +824,7 @@ int SimpleMessenger::Pipe::accept()
if (connect.connect_seq == existing->connect_seq) {
// connection race?
- if (peer_addr < msgr->ms_addr ||
+ if (peer_addr < msgr->my_inst.addr ||
existing->policy.server) {
// incoming wins
ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
@@ -850,7 +844,7 @@ int SimpleMessenger::Pipe::accept()
// our existing outgoing wins
ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
<< " == " << connect.connect_seq << ", sending WAIT" << dendl;
- assert(peer_addr > msgr->ms_addr);
+ assert(peer_addr > msgr->my_inst.addr);
if (!(existing->state == STATE_CONNECTING ||
existing->state == STATE_OPEN))
lderr(msgr->cct) << "accept race bad state, would send wait, existing=" << existing->state
@@ -1099,7 +1093,7 @@ int SimpleMessenger::Pipe::connect()
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
- if (do_sendmsg(sd, &msg, msglen)) {
+ if (do_sendmsg(&msg, msglen)) {
ldout(msgr->cct,2) << "connect couldn't write my banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
}
@@ -1139,7 +1133,7 @@ int SimpleMessenger::Pipe::connect()
if (msgr->need_addr)
msgr->learned_addr(peer_addr_for_me);
- ::encode(msgr->ms_addr, myaddrbl);
+ ::encode(msgr->my_inst.addr, myaddrbl);
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = myaddrbl.c_str();
@@ -1147,11 +1141,11 @@ int SimpleMessenger::Pipe::connect()
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
- if (do_sendmsg(sd, &msg, msglen)) {
+ if (do_sendmsg(&msg, msglen)) {
ldout(msgr->cct,2) << "connect couldn't write my addr, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
}
- ldout(msgr->cct,10) << "connect sent my addr " << msgr->ms_addr << dendl;
+ ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl;
while (1) {
@@ -1188,7 +1182,7 @@ int SimpleMessenger::Pipe::connect()
ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq
<< " proto=" << connect.protocol_version << dendl;
- if (do_sendmsg(sd, &msg, msglen)) {
+ if (do_sendmsg(&msg, msglen)) {
ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
}
@@ -2060,7 +2054,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
return ret;
}
-int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len, bool more)
+int SimpleMessenger::Pipe::do_sendmsg(struct msghdr *msg, int len, bool more)
{
char buf[80];
@@ -2085,40 +2079,6 @@ int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len, bool
return -1; // close enough
}
- if (0) {
- // hex dump
- struct iovec *v = msg->msg_iov;
- size_t left = r;
- size_t vpos = 0;
- ldout(msgr->cct,0) << "do_sendmsg wrote " << r << " bytes, hexdump:\n";
- int pos = 0;
- int col = 0;
- char buf[20];
- while (left > 0) {
- if (col == 0) {
- snprintf(buf, sizeof(buf), "%05x : ", pos);
- *_dout << buf;
- }
- snprintf(buf, sizeof(buf), " %02x", ((unsigned char*)v->iov_base)[vpos]);
- *_dout << buf;
- left--;
- if (!left)
- break;
- vpos++;
- pos++;
- if (vpos == v->iov_len) {
- v++;
- vpos = 0;
- }
- col++;
- if (col == 16) {
- *_dout << "\n";
- col = 0;
- }
- }
- *_dout << dendl;
- }
-
len -= r;
if (len == 0) break;
@@ -2162,7 +2122,7 @@ int SimpleMessenger::Pipe::write_ack(uint64_t seq)
msg.msg_iov = msgvec;
msg.msg_iovlen = 2;
- if (do_sendmsg(sd, &msg, 1 + sizeof(s), true) < 0)
+ if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0)
return -1;
return 0;
}
@@ -2181,7 +2141,7 @@ int SimpleMessenger::Pipe::write_keepalive()
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
- if (do_sendmsg(sd, &msg, 1) < 0)
+ if (do_sendmsg(&msg, 1) < 0)
return -1;
return 0;
}
@@ -2261,7 +2221,7 @@ int SimpleMessenger::Pipe::write_message(Message *m)
<< dendl;
if (msg.msg_iovlen >= IOV_MAX-2) {
- if (do_sendmsg(sd, &msg, msglen, true))
+ if (do_sendmsg(&msg, msglen, true))
goto fail;
// and restart the iov
@@ -2295,7 +2255,7 @@ int SimpleMessenger::Pipe::write_message(Message *m)
msg.msg_iovlen++;
// send
- if (do_sendmsg(sd, &msg, msglen))
+ if (do_sendmsg(&msg, msglen))
goto fail;
ret = 0;
@@ -2413,7 +2373,7 @@ int SimpleMessenger::start()
started = true;
if (!did_bind)
- ms_addr.nonce = nonce;
+ my_inst.addr.nonce = nonce;
lock.Unlock();
@@ -2432,7 +2392,7 @@ int SimpleMessenger::start()
SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, int type)
{
assert(lock.is_locked());
- assert(addr != ms_addr);
+ assert(addr != my_inst.addr);
ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
@@ -2494,7 +2454,7 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest)
{
Mutex::Locker l(lock);
Pipe *pipe = NULL;
- if (ms_addr == dest.addr) {
+ if (my_inst.addr == dest.addr) {
// local
pipe = dispatch_queue.local_pipe;
} else {
@@ -2512,16 +2472,10 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest)
}
}
if (!pipe) {
- Policy& policy = get_policy(dest.name.type());
- if (policy.lossy && policy.server)
- pipe = NULL;
- else
- pipe = connect_rank(dest.addr, dest.name.type());
+ pipe = connect_rank(dest.addr, dest.name.type());
}
}
- if (pipe)
- return (Connection *)pipe->connection_state->get();
- return NULL;
+ return (Connection *)pipe->connection_state->get();
}
@@ -2541,7 +2495,7 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr,
lock.Lock();
{
// local?
- if (ms_addr == dest_addr) {
+ if (my_inst.addr == dest_addr) {
if (!destination_stopped) {
// local
ldout(cct,20) << "submit_message " << *m << " local" << dendl;
@@ -2599,7 +2553,7 @@ int SimpleMessenger::send_keepalive(const entity_inst_t& dest)
lock.Lock();
{
// local?
- if (ms_addr != dest_addr) {
+ if (my_inst.addr != dest_addr) {
// remote.
Pipe *pipe = 0;
if (rank_pipe.count( dest_proc_addr )) {
@@ -2808,13 +2762,13 @@ void SimpleMessenger::mark_disposable(Connection *con)
void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
{
// be careful here: multiple threads may block here, and readers of
- // ms_addr do NOT hold any lock.
+ // my_inst.addr do NOT hold any lock.
lock.Lock();
if (need_addr) {
entity_addr_t t = peer_addr_for_me;
- t.set_port(ms_addr.get_port());
- ms_addr.addr = t.addr;
- ldout(cct,1) << "learned my addr " << ms_addr << dendl;
+ t.set_port(my_inst.addr.get_port());
+ my_inst.addr.addr = t.addr;
+ ldout(cct,1) << "learned my addr " << my_inst.addr << dendl;
need_addr = false;
init_local_pipe();
}
@@ -2823,6 +2777,6 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
void SimpleMessenger::init_local_pipe()
{
- dispatch_queue.local_pipe->connection_state->peer_addr = msgr->ms_addr;
+ dispatch_queue.local_pipe->connection_state->peer_addr = msgr->my_inst.addr;
dispatch_queue.local_pipe->connection_state->peer_type = msgr->my_type;
}
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index 8ab78c9fc1a..ab31789dc65 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -65,14 +65,7 @@ public:
*
* @param addr The IP address to set internally.
*/
- void set_ip(entity_addr_t& addr);
- /**
- * Retrieve the Messenger's address.
- *
- * @return A copy of he address this Messenger currently
- * believes to be its own.
- */
- virtual entity_addr_t get_myaddr();
+ void set_addr_unknowns(entity_addr_t& addr);
/**
* Retrieve the Connection for an endpoint.
*
@@ -212,7 +205,18 @@ private:
int read_message(Message **pm);
int write_message(Message *m);
- int do_sendmsg(int sd, struct msghdr *msg, int len, bool more=false);
+ /**
+ * Write the given data (of length len) to the Pipe's socket. This function
+ * will loop until all passed data has been written out.
+ * If more is set, the function will optimize socket writes
+ * for additional data (by passing the MSG_MORE flag, aka TCP_CORK).
+ *
+ * @param msg The msghdr to write out
+ * @param len The length of the data in msg
+ * @param more Should be set true if this is one part of a larger message
+ * @return 0, or -1 on failure (unrecoverable -- close the socket).
+ */
+ int do_sendmsg(struct msghdr *msg, int len, bool more=false);
int write_ack(uint64_t s);
int write_keepalive();
@@ -467,7 +471,6 @@ private:
void dispatch_throttle_release(uint64_t msize);
// SimpleMessenger stuff
- public:
Mutex lock;
Cond wait_cond; // for wait()
bool did_bind;
@@ -475,7 +478,6 @@ private:
// where i listen
bool need_addr;
- entity_addr_t ms_addr;
uint64_t nonce;
// local
@@ -491,6 +493,13 @@ private:
Policy default_policy;
map<int, Policy> policy_map; // entity_name_t::type -> Policy
+ // --- pipes ---
+ set<Pipe*> pipes;
+ list<Pipe*> pipe_reap_queue;
+
+ Mutex global_seq_lock;
+ __u32 global_seq;
+public:
Policy& get_policy(int t) {
if (policy_map.count(t))
return policy_map[t];
@@ -498,15 +507,7 @@ private:
return default_policy;
}
- // --- pipes ---
- set<Pipe*> pipes;
- list<Pipe*> pipe_reap_queue;
-
- Mutex global_seq_lock;
- __u32 global_seq;
-
Pipe *connect_rank(const entity_addr_t& addr, int type);
-
virtual void mark_down(const entity_addr_t& addr);
virtual void mark_down(Connection *con);
virtual void mark_down_on_empty(Connection *con);
diff --git a/src/msg/tcp.cc b/src/msg/tcp.cc
index 1c0abb1502e..3e2713b91a3 100644
--- a/src/msg/tcp.cc
+++ b/src/msg/tcp.cc
@@ -12,6 +12,13 @@
/******************
* tcp crap
+ *
+ * These functions only propagate unrecoverable errors -- if you see an error,
+ * close the socket. You can't know what has and hasn't been written out.
+ */
+/**
+ * Read the specified amount of data, in a blocking fashion.
+ * If there is an error, return -1 (unrecoverable).
*/
int tcp_read(CephContext *cct, int sd, char *buf, int len, int timeout)
{
@@ -42,6 +49,13 @@ int tcp_read(CephContext *cct, int sd, char *buf, int len, int timeout)
return len;
}
+/**
+ * Wait for data to become available for reading on the given socket. You
+ * can specify a timeout in milliseconds, or -1 to wait forever.
+ *
+ * @return 0 when data is available, or -1 if there
+ * is an error (unrecoverable).
+ */
int tcp_read_wait(int sd, int timeout)
{
if (sd < 0)
@@ -70,9 +84,13 @@ int tcp_read_wait(int sd, int timeout)
return 0;
}
-/* This function can only be called if poll/select says there is
- * data available. Otherwise we cannot properly interpret a
- * read of 0 bytes.
+/**
+ * Read data off of a given socket.
+ *
+ * This function can only be called if poll/select says there is data
+ * available -- otherwise we can't properly interpret a read of 0 bytes.
+ *
+ * @return The number of bytes read, or -1 on error (unrecoverable).
*/
int tcp_read_nonblocking(CephContext *cct, int sd, char *buf, int len)
{
@@ -96,6 +114,12 @@ again:
return got;
}
+/**
+ * Write the given data to the given socket. This function will loop until
+ * all passed data has been written out.
+ *
+ * @return 0, or -1 on error (unrecoverable).
+ */
int tcp_write(CephContext *cct, int sd, const char *buf, int len)
{
if (sd < 0)
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 5a2c8cd6a33..787d4f7391c 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -2008,7 +2008,7 @@ void OSD::send_boot()
int port = cluster_addr.get_port();
cluster_addr = client_messenger->get_myaddr();
cluster_addr.set_port(port);
- cluster_messenger->set_ip(cluster_addr);
+ cluster_messenger->set_addr_unknowns(cluster_addr);
dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
}
entity_addr_t hb_addr = hbserver_messenger->get_myaddr();
@@ -2016,7 +2016,7 @@ void OSD::send_boot()
int port = hb_addr.get_port();
hb_addr = cluster_addr;
hb_addr.set_port(port);
- hbserver_messenger->set_ip(hb_addr);
+ hbserver_messenger->set_addr_unknowns(hb_addr);
dout(10) << " assuming hb_addr ip matches cluster_addr" << dendl;
}
MOSDBoot *mboot = new MOSDBoot(superblock, hb_addr, cluster_addr);