diff options
author | Robert Gemmell <robbie@apache.org> | 2011-11-11 17:31:41 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-11-11 17:31:41 +0000 |
commit | 141737680ebf0680f74aabecd263f07106fe4cb6 (patch) | |
tree | da94e3c5e4b49365ec5b540a78c78f41448d75d9 | |
parent | 3fdda0591c7da1dc7700b83c5f9771cccdac4f01 (diff) | |
download | qpid-python-141737680ebf0680f74aabecd263f07106fe4cb6.tar.gz |
QPID-3446: Unregister existing subscriptions when closing the connections [during shutdown], update lock usage in order to avoid deadlock.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
Merged from trunk r1198834
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1200979 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 31 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index d6a256e2e1..bdf9c7119f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -460,7 +460,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public void queueDeleted(AMQQueue queue) { _deleted.set(true); -// _channel.queueDeleted(queue); } public boolean filtersMessages() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index e428baeebe..e18b453db3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -259,10 +259,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void close(AMQConstant cause, String message) throws AMQException { + closeSubscriptions(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { - replyCode = ConnectionCloseCode.get(cause.getCode()); + replyCode = ConnectionCloseCode.get(cause.getCode()); } catch (IllegalArgumentException iae) { @@ -399,4 +400,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { return _authorizedPrincipal.getName(); } + + @Override + public void closed() + { + closeSubscriptions(); + super.closed(); + } + + private void closeSubscriptions() + { + for (Session ssn : getChannels()) + { + ((ServerSession)ssn).unregisterSubscriptions(); + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index fbc3a10e6a..7031502e34 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -415,19 +415,18 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { queue.unregisterSubscription(sub); } - } catch (AMQException e) { // TODO - _logger.error("Failed to unregister subscription", e); + _logger.error("Failed to unregister subscription :" + e.getMessage(), e); } finally { sub.releaseSendLock(); } } - + public boolean isTransactional() { // this does not look great but there should only be one "non-transactional" @@ -686,12 +685,17 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { // unregister subscriptions in order to prevent sending of new messages // to subscriptions with closing session + unregisterSubscriptions(); + + super.close(); + } + + void unregisterSubscriptions() + { final Collection<Subscription_0_10> subscriptions = getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } - - super.close(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 17bd06538f..c87919b478 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1261,11 +1261,10 @@ public class ServerSessionDelegate extends SessionDelegate { setThreadSubject(session); - for(Subscription_0_10 sub : getSubscriptions(session)) - { - ((ServerSession)session).unregister(sub); - } - ((ServerSession)session).onClose(); + ServerSession serverSession = (ServerSession)session; + + serverSession.unregisterSubscriptions(); + serverSession.onClose(); } @Override @@ -1274,11 +1273,6 @@ public class ServerSessionDelegate extends SessionDelegate closed(session); } - public Collection<Subscription_0_10> getSubscriptions(Session session) - { - return ((ServerSession)session).getSubscriptions(); - } - private void setThreadSubject(Session session) { final ServerConnection scon = (ServerConnection) session.getConnection(); |