summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-07-18 18:03:48 -0700
committerSage Weil <sage@inktank.com>2013-07-18 18:03:48 -0700
commit0ef5213d01c422e3ac2428d73255ad1531f37c00 (patch)
tree98807e6e7caa1646637af8e8863312bf45f9c393
parent4ed7942997c7e401896be36d5b15b63e1586fffa (diff)
parentfd53d53a4221e9470ec67f64ba4fbe89bf97c91c (diff)
downloadceph-0ef5213d01c422e3ac2428d73255ad1531f37c00.tar.gz
Merge pull request #445 from ceph/wip-osd-leaks
fix msgr issues causing osd leaks on shutdown Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r--src/client/Client.cc2
-rw-r--r--src/mon/Monitor.cc19
-rw-r--r--src/msg/Messenger.h69
-rw-r--r--src/msg/SimpleMessenger.cc9
-rw-r--r--src/msg/SimpleMessenger.h217
-rw-r--r--src/osd/OSD.cc6
-rw-r--r--src/osd/ReplicatedPG.cc1
7 files changed, 75 insertions, 248 deletions
diff --git a/src/client/Client.cc b/src/client/Client.cc
index ae7ddf65db4..eb7502c1530 100644
--- a/src/client/Client.cc
+++ b/src/client/Client.cc
@@ -1882,7 +1882,7 @@ void Client::handle_mds_map(MMDSMap* m)
int newstate = mdsmap->get_state(p->first);
if (!mdsmap->is_up(p->first) ||
mdsmap->get_inst(p->first) != p->second->inst) {
- messenger->mark_down(p->second->inst.addr);
+ messenger->mark_down(p->second->con);
if (mdsmap->is_up(p->first))
p->second->inst = mdsmap->get_inst(p->first);
} else if (oldstate == newstate)
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index 9ae3e93a111..90750dd7b11 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -3242,9 +3242,6 @@ bool Monitor::ms_handle_reset(Connection *con)
{
dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
- if (is_shutdown())
- return false;
-
// ignore lossless monitor sessions
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
return false;
@@ -3253,6 +3250,12 @@ bool Monitor::ms_handle_reset(Connection *con)
if (!s)
return false;
+ // break any con <-> session ref cycle
+ s->con->set_priv(NULL);
+
+ if (is_shutdown())
+ return false;
+
Mutex::Locker l(lock);
dout(10) << "reset/close on session " << s->inst << dendl;
@@ -3474,16 +3477,16 @@ void Monitor::tick()
continue;
if (!s->until.is_zero() && s->until < now) {
- dout(10) << " trimming session " << s->inst
+ dout(10) << " trimming session " << s->con << " " << s->inst
<< " (until " << s->until << " < now " << now << ")" << dendl;
- messenger->mark_down(s->inst.addr);
+ messenger->mark_down(s->con);
remove_session(s);
} else if (!exited_quorum.is_zero()) {
if (now > (exited_quorum + 2 * g_conf->mon_lease)) {
// boot the client Session because we've taken too long getting back in
- dout(10) << " trimming session " << s->inst
- << " because we've been out of quorum too long" << dendl;
- messenger->mark_down(s->inst.addr);
+ dout(10) << " trimming session " << s->con << " " << s->inst
+ << " because we've been out of quorum too long" << dendl;
+ messenger->mark_down(s->con);
remove_session(s);
}
}
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index 28643e10767..42feaf227df 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -329,17 +329,13 @@ public:
*/
virtual int bind(const entity_addr_t& bind_addr) = 0;
/**
- * This is an optional function for implementations
- * to override. For those implementations that do
- * implement it, this function shall perform a full
- * restart of the Messenger component, whatever that means.
- * Other entities who connect to this Messenger post-rebind()
- * should perceive it as a new entity which they have not
- * previously contacted, and it MUST bind to a different
- * address than it did previously. If avoid_port is non-zero
- * it must additionally avoid that port.
- *
- * @param avoid_port An additional port to avoid binding to.
+ * This function performs a full restart of the Messenger component,
+ * whatever that means. Other entities who connect to this
+ * Messenger post-rebind() should perceive it as a new entity which
+ * they have not previously contacted, and it MUST bind to a
+ * different address than it did previously.
+ *
+ * @param avoid_ports Additional port to avoid binding to.
*/
virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; }
/**
@@ -392,6 +388,9 @@ public:
* when you pass it in.
* @param dest The entity to send the Message to.
*
+ * DEPRECATED: please do not use this interface for any new code;
+ * use the Connection* variant.
+ *
* @return 0 on success, or -errno on failure.
*/
virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
@@ -421,6 +420,9 @@ public:
* when you pass it in.
* @param dest The entity to send the Message to.
*
+ * DEPRECATED: please do not use this interface for any new code;
+ * use the Connection* variant.
+ *
* @return 0.
*/
virtual int lazy_send_message(Message *m, const entity_inst_t& dest) = 0;
@@ -476,22 +478,33 @@ public:
*/
virtual int send_keepalive(Connection *con) = 0;
/**
- * Mark down a Connection to a remote. This will cause us to
- * discard our outgoing queue for them, and if they try
- * to reconnect they will discard their queue when we
- * inform them of the session reset. If there is no
- * Connection to the given dest, it is a no-op.
- * It does not generate any notifications to the Dispatcher.
+ * Mark down a Connection to a remote.
+ *
+ * This will cause us to discard our outgoing queue for them, and if
+ * reset detection is enabled in the policy and the endpoint tries
+ * to reconnect they will discard their queue when we inform them of
+ * the session reset.
+ *
+ * If there is no Connection to the given dest, it is a no-op.
+ *
+ * This generates a RESET notification to the Dispatcher.
+ *
+ * DEPRECATED: please do not use this interface for any new code;
+ * use the Connection* variant.
*
* @param a The address to mark down.
*/
virtual void mark_down(const entity_addr_t& a) = 0;
/**
- * Mark down the given Connection. This will cause us to
- * discard its outgoing queue, and if the endpoint tries
- * to reconnect they will discard their queue when we
- * inform them of the session reset.
+ * Mark down the given Connection.
+ *
+ * This will cause us to discard its outgoing queue, and if reset
+ * detection is enabled in the policy and the endpoint tries to
+ * reconnect they will discard their queue when we inform them of
+ * the session reset.
+ *
* If the Connection* is NULL, this is a no-op.
+ *
* It does not generate any notifications to the Dispatcher.
*
* @param con The Connection to mark down.
@@ -501,6 +514,14 @@ public:
mark_down(con.get());
}
/**
+ * Mark all the existing Connections down. This is equivalent
+ * to iterating over all Connections and calling mark_down()
+ * on each.
+ *
+ * This will generate a RESET event for each closed connections.
+ */
+ virtual void mark_down_all() = 0;
+ /**
* Unlike mark_down, this function will try and deliver
* all messages before ending the connection, and it will use
* the Pipe's existing semantics to do so. Once the Messages
@@ -530,12 +551,6 @@ public:
*/
virtual void mark_disposable(Connection *con) = 0;
/**
- * Mark all the existing Connections down. This is equivalent
- * to iterating over all Connections and calling mark_down()
- * on each.
- */
- virtual void mark_down_all() = 0;
- /**
* @} // Connection Management
*/
protected:
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index afee0952630..441ed432af0 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -599,9 +599,12 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr)
p->pipe_lock.Lock();
p->stop();
if (p->connection_state) {
- // do not generate a reset event for the caller in this case,
- // since they asked for it.
- p->connection_state->clear_pipe(p);
+ // generate a reset event for the caller in this case, even
+ // though they asked for it, since this is the addr-based (and
+ // not Connection* based) interface
+ ConnectionRef con = p->connection_state;
+ if (con && con->clear_pipe(p))
+ dispatch_queue.queue_reset(con.get());
}
p->pipe_lock.Unlock();
} else {
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index 4538b0f18bc..6860c6c21a3 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -92,28 +92,12 @@ public:
/** @defgroup Accessors
* @{
*/
- /**
- * Set the IP this SimpleMessenger is using. This is useful if it's unset
- * but another SimpleMessenger on the same interface has already learned its
- * IP. Of course, this function does not change the port, since the
- * SimpleMessenger always knows the correct setting for that.
- * If the SimpleMesssenger's IP is already set, this function is a no-op.
- *
- * @param addr The IP address to set internally.
- */
void set_addr_unknowns(entity_addr_t& addr);
- /**
- * Get the number of Messages which the SimpleMessenger has received
- * but not yet dispatched.
- * @return The length of the Dispatch queue.
- */
+
int get_dispatch_queue_len() {
return dispatch_queue.get_queue_len();
}
- /**
- * Get age of oldest undelivered message
- * (0 if the queue is empty)
- */
+
double get_dispatch_queue_max_age(utime_t now) {
return dispatch_queue.get_max_age(now);
}
@@ -123,52 +107,21 @@ public:
* @defgroup Configuration functions
* @{
*/
- /**
- * Set the cluster protocol in use by this daemon.
- * This is an init-time function and cannot be called after calling
- * start() or bind().
- *
- * @param p The cluster protocol to use. Defined externally.
- */
void set_cluster_protocol(int p) {
assert(!started && !did_bind);
cluster_protocol = p;
}
- /**
- * Set a policy which is applied to all peers who do not have a type-specific
- * Policy.
- * This is an init-time function and cannot be called after calling
- * start() or bind().
- *
- * @param p The Policy to apply.
- */
+
void set_default_policy(Policy p) {
Mutex::Locker l(policy_lock);
default_policy = p;
}
- /**
- * Set a policy which is applied to all peers of the given type.
- * This is an init-time function and cannot be called after calling
- * start() or bind().
- *
- * @param type The peer type this policy applies to.
- * @param p The policy to apply.
- */
+
void set_policy(int type, Policy p) {
Mutex::Locker l(policy_lock);
policy_map[type] = p;
}
- /**
- * Set a Throttler which is applied to all Messages from the given
- * type of peer.
- * This is an init-time function and cannot be called after calling
- * start() or bind().
- *
- * @param type The peer type this Throttler will apply to.
- * @param t The Throttler to apply. SimpleMessenger does not take
- * ownership of this pointer, but you must not destroy it before
- * you destroy SimpleMessenger.
- */
+
void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) {
Mutex::Locker l(policy_lock);
if (policy_map.count(type)) {
@@ -179,50 +132,18 @@ public:
default_policy.throttler_messages = msg_throttle;
}
}
- /**
- * Bind the SimpleMessenger to a specific address. If bind_addr
- * is not completely filled in the system will use the
- * valid portions and cycle through the unset ones (eg, the port)
- * in an unspecified order.
- *
- * @param bind_addr The address to bind to.
- * @return 0 on success, or -1 if the SimpleMessenger is already running, or
- * -errno if an error is returned from a system call.
- */
+
int bind(const entity_addr_t& bind_addr);
- /**
- * This function performs a full restart of the SimpleMessenger. It
- * calls mark_down_all() and binds to a new port. (If avoid_port
- * is set it additionally avoids that specific port.)
- *
- * @param avoid_port An additional port to avoid binding to.
- */
int rebind(const set<int>& avoid_ports);
+
/** @} Configuration functions */
/**
* @defgroup Startup/Shutdown
* @{
*/
- /**
- * Start up the SimpleMessenger. Create worker threads as necessary.
- * @return 0
- */
virtual int start();
- /**
- * Wait until the SimpleMessenger is ready to shut down (triggered by a
- * call to the shutdown() function), then handle
- * stopping its threads and cleaning up Pipes and various queues.
- * Once this function returns, the SimpleMessenger is fully shut down and
- * can be deleted.
- */
virtual void wait();
- /**
- * Tell the SimpleMessenger to shut down. This function does not
- * complete the shutdown; it just triggers it.
- *
- * @return 0
- */
virtual int shutdown();
/** @} // Startup/Shutdown */
@@ -231,60 +152,18 @@ public:
* @defgroup Messaging
* @{
*/
- /**
- * Queue the given Message for the given entity.
- * Success in this function does not guarantee Message delivery, only
- * success in queueing the Message. Other guarantees may be provided based
- * on the Connection policy associated with the dest.
- *
- * @param m The Message to send. The Messenger consumes a single reference
- * when you pass it in.
- * @param dest The entity to send the Message to.
- *
- * @return 0 on success, or -EINVAL if the dest's address is empty.
- */
virtual int send_message(Message *m, const entity_inst_t& dest) {
return _send_message(m, dest, false);
}
- /**
- * Queue the given Message to send out on the given Connection.
- * Success in this function does not guarantee Message delivery, only
- * success in queueing the Message (or else a guaranteed-safe drop).
- * Other guarantees may be provided based on the Connection policy.
- *
- * @param m The Message to send. The Messenger consumes a single reference
- * when you pass it in.
- * @param con The Connection to send the Message out on.
- *
- * @return 0 on success.
- */
+
virtual int send_message(Message *m, Connection *con) {
return _send_message(m, con, false);
}
- /**
- * Lazily queue the given Message for the given entity. Unlike with
- * send_message(), lazy_send_message() will not establish a
- * Connection if none exists, re-establish the connection if it
- * has broken, or queue the Message if the connection is broken.
- *
- * @param m The Message to send. The Messenger consumes a single reference
- * when you pass it in.
- * @param dest The entity to send the Message to.
- *
- * @return 0 on success, or -EINVAL if the dest's address is empty.
- */
+
virtual int lazy_send_message(Message *m, const entity_inst_t& dest) {
return _send_message(m, dest, true);
}
- /**
- * Lazily queue the given Message for the given Connection.
- *
- * @param m The Message to send. The Messenger consumes a single reference
- * when you pass it in.
- * @param con The Connection to send the Message out on.
- *
- * @return 0.
- */
+
virtual int lazy_send_message(Message *m, Connection *con) {
return _send_message(m, con, true);
}
@@ -294,90 +173,14 @@ public:
* @defgroup Connection Management
* @{
*/
- /**
- * Get the Connection object associated with a given entity. If a
- * Connection does not exist, create one and establish a logical connection.
- * The caller owns a reference when this returns. Call ->put() when you're
- * done!
- *
- * @param dest The entity to get a connection for.
- * @return The requested Connection, as a pointer whose reference you own.
- */
virtual ConnectionRef get_connection(const entity_inst_t& dest);
virtual ConnectionRef get_loopback_connection();
- /**
- * Send a "keepalive" ping to the given dest, if it has a working Connection.
- * If the Messenger doesn't already have a Connection, or if the underlying
- * connection has broken, this function does nothing.
- *
- * @param dest The entity to send the keepalive to.
- * @return 0, or -EINVAL if we don't already have a Connection, or
- * -EPIPE if a Pipe for the dest doesn't exist.
- */
virtual int send_keepalive(const entity_inst_t& addr);
- /**
- * Send a "keepalive" ping along the given Connection, if it's working.
- * If the underlying connection has broken, this function does nothing.
- *
- * @param dest The entity to send the keepalive to.
- * @return 0, or -EPIPE if the Connection doesn't have a running Pipe.
- */
virtual int send_keepalive(Connection *con);
- /**
- * Mark down a Connection to a remote. This will cause us to
- * discard our outgoing queue for them, and if they try
- * to reconnect they will discard their queue when we
- * inform them of the session reset. If there is no
- * Connection to the given dest, it is a no-op.
- * It does not generate any notifications to the Dispatcher.
- *
- * @param a The address to mark down.
- */
virtual void mark_down(const entity_addr_t& addr);
- /**
- * Mark down the given Connection. This will cause us to
- * discard its outgoing queue, and if the endpoint tries
- * to reconnect they will discard their queue when we
- * inform them of the session reset.
- * It does not generate any notifications to the Dispatcher.
- *
- * @param con The Connection to mark down.
- */
virtual void mark_down(Connection *con);
- /**
- * Unlike mark_down, this function will try and deliver
- * all messages before ending the connection, and it will use
- * the Pipe's existing semantics to do so. Once the Messages
- * all been sent out (and acked, if using reliable delivery)
- * the Connection will be closed.
- * This function means that you will get standard delivery to endpoints,
- * and then the Connection will be cleaned up. It does not
- * generate any notifications to the Dispatcher.
- *
- * @param con The Connection to mark down.
- */
virtual void mark_down_on_empty(Connection *con);
- /**
- * Mark a Connection as "disposable", setting it to lossy
- * (regardless of initial Policy). Unlike mark_down_on_empty()
- * this does not immediately close the Connection once
- * Messages have been delivered, so as long as there are no errors you can
- * continue to receive responses; but it will not attempt
- * to reconnect for message delivery or preserve your old
- * delivery semantics, either.
- * You can compose this with mark_down, in which case the Pipe
- * will make sure to send all Messages and wait for an ack before
- * closing, but if there's a failure it will simply shut down. It
- * does not generate any notifications to the Dispatcher.
- *
- * @param con The Connection to mark as disposable.
- */
virtual void mark_disposable(Connection *con);
- /**
- * Mark all the existing Connections down. This is equivalent
- * to iterating over all Connections and calling mark_down()
- * on each.
- */
virtual void mark_down_all();
/** @} // Connection Management */
protected:
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 2cecd60c18b..2dc59b32f4f 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -3371,8 +3371,8 @@ void OSD::ms_handle_connect(Connection *con)
bool OSD::ms_handle_reset(Connection *con)
{
- dout(1) << "OSD::ms_handle_reset()" << dendl;
OSD::Session *session = (OSD::Session *)con->get_priv();
+ dout(1) << "ms_handle_reset con " << con << " session " << session << dendl;
if (!session)
return false;
session->wstate.reset();
@@ -5575,7 +5575,9 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
<< " msg was " << m->get_source_inst().addr
<< " expected " << (osdmap->have_inst(from) ? osdmap->get_cluster_addr(from) : entity_addr_t())
<< dendl;
- cluster_messenger->mark_down(m->get_connection());
+ ConnectionRef con = m->get_connection();
+ con->set_priv(NULL); // break ref <-> session cycle, if any
+ cluster_messenger->mark_down(con.get());
return false;
}
}
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 14708e38cd9..453fdacfb76 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -821,6 +821,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (osd_op.op.op == CEPH_OSD_OP_LIST_SNAPS &&
m->get_snapid() != CEPH_SNAPDIR) {
dout(10) << "LIST_SNAPS with incorrect context" << dendl;
+ put_object_context(obc);
osd->reply_op_error(op, -EINVAL);
return;
}