diff options
author | Robert Gemmell <robbie@apache.org> | 2011-11-07 17:40:03 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-11-07 17:40:03 +0000 |
commit | cd60f6126d09308f925a88d3168925964938cee1 (patch) | |
tree | 5f1a87610281cd3644b3d15c3111beb3c216a2bf | |
parent | a996985f58a8843104c57c8e2cde185bfc143480 (diff) | |
download | qpid-python-cd60f6126d09308f925a88d3168925964938cee1.tar.gz |
QPID-3446: Unregister existing subscriptions when closing the connections [during shutdown], update lock usage in order to avoid deadlock.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1198834 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 31 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index d6a256e2e1..bdf9c7119f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -460,7 +460,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public void queueDeleted(AMQQueue queue) { _deleted.set(true); -// _channel.queueDeleted(queue); } public boolean filtersMessages() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index e428baeebe..e18b453db3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -259,10 +259,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void close(AMQConstant cause, String message) throws AMQException { + closeSubscriptions(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { - replyCode = ConnectionCloseCode.get(cause.getCode()); + replyCode = ConnectionCloseCode.get(cause.getCode()); } catch (IllegalArgumentException iae) { @@ -399,4 +400,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { return _authorizedPrincipal.getName(); } + + @Override + public void closed() + { + closeSubscriptions(); + super.closed(); + } + + private void closeSubscriptions() + { + for (Session ssn : getChannels()) + { + ((ServerSession)ssn).unregisterSubscriptions(); + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index fbc3a10e6a..7031502e34 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -415,19 +415,18 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { queue.unregisterSubscription(sub); } - } catch (AMQException e) { // TODO - _logger.error("Failed to unregister subscription", e); + _logger.error("Failed to unregister subscription :" + e.getMessage(), e); } finally { sub.releaseSendLock(); } } - + public boolean isTransactional() { // this does not look great but there should only be one "non-transactional" @@ -686,12 +685,17 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { // unregister subscriptions in order to prevent sending of new messages // to subscriptions with closing session + unregisterSubscriptions(); + + super.close(); + } + + void unregisterSubscriptions() + { final Collection<Subscription_0_10> subscriptions = getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } - - super.close(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 17bd06538f..c87919b478 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1261,11 +1261,10 @@ public class ServerSessionDelegate extends SessionDelegate { setThreadSubject(session); - for(Subscription_0_10 sub : getSubscriptions(session)) - { - ((ServerSession)session).unregister(sub); - } - ((ServerSession)session).onClose(); + ServerSession serverSession = (ServerSession)session; + + serverSession.unregisterSubscriptions(); + serverSession.onClose(); } @Override @@ -1274,11 +1273,6 @@ public class ServerSessionDelegate extends SessionDelegate closed(session); } - public Collection<Subscription_0_10> getSubscriptions(Session session) - { - return ((ServerSession)session).getSubscriptions(); - } - private void setThreadSubject(Session session) { final ServerConnection scon = (ServerConnection) session.getConnection(); |