summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-11-11 17:31:41 +0000
committerRobert Gemmell <robbie@apache.org>2011-11-11 17:31:41 +0000
commit141737680ebf0680f74aabecd263f07106fe4cb6 (patch)
treeda94e3c5e4b49365ec5b540a78c78f41448d75d9
parent3fdda0591c7da1dc7700b83c5f9771cccdac4f01 (diff)
downloadqpid-python-141737680ebf0680f74aabecd263f07106fe4cb6.tar.gz
QPID-3446: Unregister existing subscriptions when closing the connections [during shutdown], update lock usage in order to avoid deadlock.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. Merged from trunk r1198834 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1200979 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java14
4 files changed, 31 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index d6a256e2e1..bdf9c7119f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -460,7 +460,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public void queueDeleted(AMQQueue queue)
{
_deleted.set(true);
-// _channel.queueDeleted(queue);
}
public boolean filtersMessages()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index e428baeebe..e18b453db3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -259,10 +259,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void close(AMQConstant cause, String message) throws AMQException
{
+ closeSubscriptions();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
- replyCode = ConnectionCloseCode.get(cause.getCode());
+ replyCode = ConnectionCloseCode.get(cause.getCode());
}
catch (IllegalArgumentException iae)
{
@@ -399,4 +400,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
return _authorizedPrincipal.getName();
}
+
+ @Override
+ public void closed()
+ {
+ closeSubscriptions();
+ super.closed();
+ }
+
+ private void closeSubscriptions()
+ {
+ for (Session ssn : getChannels())
+ {
+ ((ServerSession)ssn).unregisterSubscriptions();
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index fbc3a10e6a..7031502e34 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -415,19 +415,18 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
queue.unregisterSubscription(sub);
}
-
}
catch (AMQException e)
{
// TODO
- _logger.error("Failed to unregister subscription", e);
+ _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
}
finally
{
sub.releaseSendLock();
}
}
-
+
public boolean isTransactional()
{
// this does not look great but there should only be one "non-transactional"
@@ -686,12 +685,17 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
// unregister subscriptions in order to prevent sending of new messages
// to subscriptions with closing session
+ unregisterSubscriptions();
+
+ super.close();
+ }
+
+ void unregisterSubscriptions()
+ {
final Collection<Subscription_0_10> subscriptions = getSubscriptions();
for (Subscription_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
-
- super.close();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 17bd06538f..c87919b478 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -1261,11 +1261,10 @@ public class ServerSessionDelegate extends SessionDelegate
{
setThreadSubject(session);
- for(Subscription_0_10 sub : getSubscriptions(session))
- {
- ((ServerSession)session).unregister(sub);
- }
- ((ServerSession)session).onClose();
+ ServerSession serverSession = (ServerSession)session;
+
+ serverSession.unregisterSubscriptions();
+ serverSession.onClose();
}
@Override
@@ -1274,11 +1273,6 @@ public class ServerSessionDelegate extends SessionDelegate
closed(session);
}
- public Collection<Subscription_0_10> getSubscriptions(Session session)
- {
- return ((ServerSession)session).getSubscriptions();
- }
-
private void setThreadSubject(Session session)
{
final ServerConnection scon = (ServerConnection) session.getConnection();