diff options
author | Sage Weil <sage@inktank.com> | 2013-07-18 18:03:48 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-07-18 18:03:48 -0700 |
commit | 0ef5213d01c422e3ac2428d73255ad1531f37c00 (patch) | |
tree | 98807e6e7caa1646637af8e8863312bf45f9c393 | |
parent | 4ed7942997c7e401896be36d5b15b63e1586fffa (diff) | |
parent | fd53d53a4221e9470ec67f64ba4fbe89bf97c91c (diff) | |
download | ceph-0ef5213d01c422e3ac2428d73255ad1531f37c00.tar.gz |
Merge pull request #445 from ceph/wip-osd-leaks
fix msgr issues causing osd leaks on shutdown
Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r-- | src/client/Client.cc | 2 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 19 | ||||
-rw-r--r-- | src/msg/Messenger.h | 69 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.cc | 9 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 217 | ||||
-rw-r--r-- | src/osd/OSD.cc | 6 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 1 |
7 files changed, 75 insertions, 248 deletions
diff --git a/src/client/Client.cc b/src/client/Client.cc index ae7ddf65db4..eb7502c1530 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -1882,7 +1882,7 @@ void Client::handle_mds_map(MMDSMap* m) int newstate = mdsmap->get_state(p->first); if (!mdsmap->is_up(p->first) || mdsmap->get_inst(p->first) != p->second->inst) { - messenger->mark_down(p->second->inst.addr); + messenger->mark_down(p->second->con); if (mdsmap->is_up(p->first)) p->second->inst = mdsmap->get_inst(p->first); } else if (oldstate == newstate) diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 9ae3e93a111..90750dd7b11 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -3242,9 +3242,6 @@ bool Monitor::ms_handle_reset(Connection *con) { dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl; - if (is_shutdown()) - return false; - // ignore lossless monitor sessions if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) return false; @@ -3253,6 +3250,12 @@ bool Monitor::ms_handle_reset(Connection *con) if (!s) return false; + // break any con <-> session ref cycle + s->con->set_priv(NULL); + + if (is_shutdown()) + return false; + Mutex::Locker l(lock); dout(10) << "reset/close on session " << s->inst << dendl; @@ -3474,16 +3477,16 @@ void Monitor::tick() continue; if (!s->until.is_zero() && s->until < now) { - dout(10) << " trimming session " << s->inst + dout(10) << " trimming session " << s->con << " " << s->inst << " (until " << s->until << " < now " << now << ")" << dendl; - messenger->mark_down(s->inst.addr); + messenger->mark_down(s->con); remove_session(s); } else if (!exited_quorum.is_zero()) { if (now > (exited_quorum + 2 * g_conf->mon_lease)) { // boot the client Session because we've taken too long getting back in - dout(10) << " trimming session " << s->inst - << " because we've been out of quorum too long" << dendl; - messenger->mark_down(s->inst.addr); + dout(10) << " trimming session " << s->con << " " << s->inst + << " because we've been out of quorum too long" << dendl; + messenger->mark_down(s->con); remove_session(s); } } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 28643e10767..42feaf227df 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -329,17 +329,13 @@ public: */ virtual int bind(const entity_addr_t& bind_addr) = 0; /** - * This is an optional function for implementations - * to override. For those implementations that do - * implement it, this function shall perform a full - * restart of the Messenger component, whatever that means. - * Other entities who connect to this Messenger post-rebind() - * should perceive it as a new entity which they have not - * previously contacted, and it MUST bind to a different - * address than it did previously. If avoid_port is non-zero - * it must additionally avoid that port. - * - * @param avoid_port An additional port to avoid binding to. + * This function performs a full restart of the Messenger component, + * whatever that means. Other entities who connect to this + * Messenger post-rebind() should perceive it as a new entity which + * they have not previously contacted, and it MUST bind to a + * different address than it did previously. + * + * @param avoid_ports Additional port to avoid binding to. */ virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; } /** @@ -392,6 +388,9 @@ public: * when you pass it in. * @param dest The entity to send the Message to. * + * DEPRECATED: please do not use this interface for any new code; + * use the Connection* variant. + * * @return 0 on success, or -errno on failure. */ virtual int send_message(Message *m, const entity_inst_t& dest) = 0; @@ -421,6 +420,9 @@ public: * when you pass it in. * @param dest The entity to send the Message to. * + * DEPRECATED: please do not use this interface for any new code; + * use the Connection* variant. + * * @return 0. */ virtual int lazy_send_message(Message *m, const entity_inst_t& dest) = 0; @@ -476,22 +478,33 @@ public: */ virtual int send_keepalive(Connection *con) = 0; /** - * Mark down a Connection to a remote. This will cause us to - * discard our outgoing queue for them, and if they try - * to reconnect they will discard their queue when we - * inform them of the session reset. If there is no - * Connection to the given dest, it is a no-op. - * It does not generate any notifications to the Dispatcher. + * Mark down a Connection to a remote. + * + * This will cause us to discard our outgoing queue for them, and if + * reset detection is enabled in the policy and the endpoint tries + * to reconnect they will discard their queue when we inform them of + * the session reset. + * + * If there is no Connection to the given dest, it is a no-op. + * + * This generates a RESET notification to the Dispatcher. + * + * DEPRECATED: please do not use this interface for any new code; + * use the Connection* variant. * * @param a The address to mark down. */ virtual void mark_down(const entity_addr_t& a) = 0; /** - * Mark down the given Connection. This will cause us to - * discard its outgoing queue, and if the endpoint tries - * to reconnect they will discard their queue when we - * inform them of the session reset. + * Mark down the given Connection. + * + * This will cause us to discard its outgoing queue, and if reset + * detection is enabled in the policy and the endpoint tries to + * reconnect they will discard their queue when we inform them of + * the session reset. + * * If the Connection* is NULL, this is a no-op. + * * It does not generate any notifications to the Dispatcher. * * @param con The Connection to mark down. @@ -501,6 +514,14 @@ public: mark_down(con.get()); } /** + * Mark all the existing Connections down. This is equivalent + * to iterating over all Connections and calling mark_down() + * on each. + * + * This will generate a RESET event for each closed connections. + */ + virtual void mark_down_all() = 0; + /** * Unlike mark_down, this function will try and deliver * all messages before ending the connection, and it will use * the Pipe's existing semantics to do so. Once the Messages @@ -530,12 +551,6 @@ public: */ virtual void mark_disposable(Connection *con) = 0; /** - * Mark all the existing Connections down. This is equivalent - * to iterating over all Connections and calling mark_down() - * on each. - */ - virtual void mark_down_all() = 0; - /** * @} // Connection Management */ protected: diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index afee0952630..441ed432af0 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -599,9 +599,12 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr) p->pipe_lock.Lock(); p->stop(); if (p->connection_state) { - // do not generate a reset event for the caller in this case, - // since they asked for it. - p->connection_state->clear_pipe(p); + // generate a reset event for the caller in this case, even + // though they asked for it, since this is the addr-based (and + // not Connection* based) interface + ConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); } p->pipe_lock.Unlock(); } else { diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 4538b0f18bc..6860c6c21a3 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -92,28 +92,12 @@ public: /** @defgroup Accessors * @{ */ - /** - * Set the IP this SimpleMessenger is using. This is useful if it's unset - * but another SimpleMessenger on the same interface has already learned its - * IP. Of course, this function does not change the port, since the - * SimpleMessenger always knows the correct setting for that. - * If the SimpleMesssenger's IP is already set, this function is a no-op. - * - * @param addr The IP address to set internally. - */ void set_addr_unknowns(entity_addr_t& addr); - /** - * Get the number of Messages which the SimpleMessenger has received - * but not yet dispatched. - * @return The length of the Dispatch queue. - */ + int get_dispatch_queue_len() { return dispatch_queue.get_queue_len(); } - /** - * Get age of oldest undelivered message - * (0 if the queue is empty) - */ + double get_dispatch_queue_max_age(utime_t now) { return dispatch_queue.get_max_age(now); } @@ -123,52 +107,21 @@ public: * @defgroup Configuration functions * @{ */ - /** - * Set the cluster protocol in use by this daemon. - * This is an init-time function and cannot be called after calling - * start() or bind(). - * - * @param p The cluster protocol to use. Defined externally. - */ void set_cluster_protocol(int p) { assert(!started && !did_bind); cluster_protocol = p; } - /** - * Set a policy which is applied to all peers who do not have a type-specific - * Policy. - * This is an init-time function and cannot be called after calling - * start() or bind(). - * - * @param p The Policy to apply. - */ + void set_default_policy(Policy p) { Mutex::Locker l(policy_lock); default_policy = p; } - /** - * Set a policy which is applied to all peers of the given type. - * This is an init-time function and cannot be called after calling - * start() or bind(). - * - * @param type The peer type this policy applies to. - * @param p The policy to apply. - */ + void set_policy(int type, Policy p) { Mutex::Locker l(policy_lock); policy_map[type] = p; } - /** - * Set a Throttler which is applied to all Messages from the given - * type of peer. - * This is an init-time function and cannot be called after calling - * start() or bind(). - * - * @param type The peer type this Throttler will apply to. - * @param t The Throttler to apply. SimpleMessenger does not take - * ownership of this pointer, but you must not destroy it before - * you destroy SimpleMessenger. - */ + void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) { Mutex::Locker l(policy_lock); if (policy_map.count(type)) { @@ -179,50 +132,18 @@ public: default_policy.throttler_messages = msg_throttle; } } - /** - * Bind the SimpleMessenger to a specific address. If bind_addr - * is not completely filled in the system will use the - * valid portions and cycle through the unset ones (eg, the port) - * in an unspecified order. - * - * @param bind_addr The address to bind to. - * @return 0 on success, or -1 if the SimpleMessenger is already running, or - * -errno if an error is returned from a system call. - */ + int bind(const entity_addr_t& bind_addr); - /** - * This function performs a full restart of the SimpleMessenger. It - * calls mark_down_all() and binds to a new port. (If avoid_port - * is set it additionally avoids that specific port.) - * - * @param avoid_port An additional port to avoid binding to. - */ int rebind(const set<int>& avoid_ports); + /** @} Configuration functions */ /** * @defgroup Startup/Shutdown * @{ */ - /** - * Start up the SimpleMessenger. Create worker threads as necessary. - * @return 0 - */ virtual int start(); - /** - * Wait until the SimpleMessenger is ready to shut down (triggered by a - * call to the shutdown() function), then handle - * stopping its threads and cleaning up Pipes and various queues. - * Once this function returns, the SimpleMessenger is fully shut down and - * can be deleted. - */ virtual void wait(); - /** - * Tell the SimpleMessenger to shut down. This function does not - * complete the shutdown; it just triggers it. - * - * @return 0 - */ virtual int shutdown(); /** @} // Startup/Shutdown */ @@ -231,60 +152,18 @@ public: * @defgroup Messaging * @{ */ - /** - * Queue the given Message for the given entity. - * Success in this function does not guarantee Message delivery, only - * success in queueing the Message. Other guarantees may be provided based - * on the Connection policy associated with the dest. - * - * @param m The Message to send. The Messenger consumes a single reference - * when you pass it in. - * @param dest The entity to send the Message to. - * - * @return 0 on success, or -EINVAL if the dest's address is empty. - */ virtual int send_message(Message *m, const entity_inst_t& dest) { return _send_message(m, dest, false); } - /** - * Queue the given Message to send out on the given Connection. - * Success in this function does not guarantee Message delivery, only - * success in queueing the Message (or else a guaranteed-safe drop). - * Other guarantees may be provided based on the Connection policy. - * - * @param m The Message to send. The Messenger consumes a single reference - * when you pass it in. - * @param con The Connection to send the Message out on. - * - * @return 0 on success. - */ + virtual int send_message(Message *m, Connection *con) { return _send_message(m, con, false); } - /** - * Lazily queue the given Message for the given entity. Unlike with - * send_message(), lazy_send_message() will not establish a - * Connection if none exists, re-establish the connection if it - * has broken, or queue the Message if the connection is broken. - * - * @param m The Message to send. The Messenger consumes a single reference - * when you pass it in. - * @param dest The entity to send the Message to. - * - * @return 0 on success, or -EINVAL if the dest's address is empty. - */ + virtual int lazy_send_message(Message *m, const entity_inst_t& dest) { return _send_message(m, dest, true); } - /** - * Lazily queue the given Message for the given Connection. - * - * @param m The Message to send. The Messenger consumes a single reference - * when you pass it in. - * @param con The Connection to send the Message out on. - * - * @return 0. - */ + virtual int lazy_send_message(Message *m, Connection *con) { return _send_message(m, con, true); } @@ -294,90 +173,14 @@ public: * @defgroup Connection Management * @{ */ - /** - * Get the Connection object associated with a given entity. If a - * Connection does not exist, create one and establish a logical connection. - * The caller owns a reference when this returns. Call ->put() when you're - * done! - * - * @param dest The entity to get a connection for. - * @return The requested Connection, as a pointer whose reference you own. - */ virtual ConnectionRef get_connection(const entity_inst_t& dest); virtual ConnectionRef get_loopback_connection(); - /** - * Send a "keepalive" ping to the given dest, if it has a working Connection. - * If the Messenger doesn't already have a Connection, or if the underlying - * connection has broken, this function does nothing. - * - * @param dest The entity to send the keepalive to. - * @return 0, or -EINVAL if we don't already have a Connection, or - * -EPIPE if a Pipe for the dest doesn't exist. - */ virtual int send_keepalive(const entity_inst_t& addr); - /** - * Send a "keepalive" ping along the given Connection, if it's working. - * If the underlying connection has broken, this function does nothing. - * - * @param dest The entity to send the keepalive to. - * @return 0, or -EPIPE if the Connection doesn't have a running Pipe. - */ virtual int send_keepalive(Connection *con); - /** - * Mark down a Connection to a remote. This will cause us to - * discard our outgoing queue for them, and if they try - * to reconnect they will discard their queue when we - * inform them of the session reset. If there is no - * Connection to the given dest, it is a no-op. - * It does not generate any notifications to the Dispatcher. - * - * @param a The address to mark down. - */ virtual void mark_down(const entity_addr_t& addr); - /** - * Mark down the given Connection. This will cause us to - * discard its outgoing queue, and if the endpoint tries - * to reconnect they will discard their queue when we - * inform them of the session reset. - * It does not generate any notifications to the Dispatcher. - * - * @param con The Connection to mark down. - */ virtual void mark_down(Connection *con); - /** - * Unlike mark_down, this function will try and deliver - * all messages before ending the connection, and it will use - * the Pipe's existing semantics to do so. Once the Messages - * all been sent out (and acked, if using reliable delivery) - * the Connection will be closed. - * This function means that you will get standard delivery to endpoints, - * and then the Connection will be cleaned up. It does not - * generate any notifications to the Dispatcher. - * - * @param con The Connection to mark down. - */ virtual void mark_down_on_empty(Connection *con); - /** - * Mark a Connection as "disposable", setting it to lossy - * (regardless of initial Policy). Unlike mark_down_on_empty() - * this does not immediately close the Connection once - * Messages have been delivered, so as long as there are no errors you can - * continue to receive responses; but it will not attempt - * to reconnect for message delivery or preserve your old - * delivery semantics, either. - * You can compose this with mark_down, in which case the Pipe - * will make sure to send all Messages and wait for an ack before - * closing, but if there's a failure it will simply shut down. It - * does not generate any notifications to the Dispatcher. - * - * @param con The Connection to mark as disposable. - */ virtual void mark_disposable(Connection *con); - /** - * Mark all the existing Connections down. This is equivalent - * to iterating over all Connections and calling mark_down() - * on each. - */ virtual void mark_down_all(); /** @} // Connection Management */ protected: diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 2cecd60c18b..2dc59b32f4f 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3371,8 +3371,8 @@ void OSD::ms_handle_connect(Connection *con) bool OSD::ms_handle_reset(Connection *con) { - dout(1) << "OSD::ms_handle_reset()" << dendl; OSD::Session *session = (OSD::Session *)con->get_priv(); + dout(1) << "ms_handle_reset con " << con << " session " << session << dendl; if (!session) return false; session->wstate.reset(); @@ -5575,7 +5575,9 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch) << " msg was " << m->get_source_inst().addr << " expected " << (osdmap->have_inst(from) ? osdmap->get_cluster_addr(from) : entity_addr_t()) << dendl; - cluster_messenger->mark_down(m->get_connection()); + ConnectionRef con = m->get_connection(); + con->set_priv(NULL); // break ref <-> session cycle, if any + cluster_messenger->mark_down(con.get()); return false; } } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 14708e38cd9..453fdacfb76 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -821,6 +821,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (osd_op.op.op == CEPH_OSD_OP_LIST_SNAPS && m->get_snapid() != CEPH_SNAPDIR) { dout(10) << "LIST_SNAPS with incorrect context" << dendl; + put_object_context(obc); osd->reply_op_error(op, -EINVAL); return; } |