diff options
author | Keith Wall <kwall@apache.org> | 2011-08-25 10:46:39 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2011-08-25 10:46:39 +0000 |
commit | 1dc1ece5aec7ba37654a19cbf1d860ec809cced9 (patch) | |
tree | d14dacffa5c765ccba2e2bc4d7f04187ca8d140e | |
parent | 99a4b6301a06eaf6681e9593695b5354fbc3fb0b (diff) | |
download | qpid-python-1dc1ece5aec7ba37654a19cbf1d860ec809cced9.tar.gz |
QPID-3452: Broker now unregisters any remaining subscriptions on receipt of SessionDetach to prevent SubFlushRunner and QueueRunner sending erroneous frames causing a ProtocolViolationException on the client.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1161492 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java | 21 | ||||
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java | 5 |
2 files changed, 23 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index f65cad23e7..b3acf48676 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.transport; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticationResult; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; +import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; @@ -194,4 +196,23 @@ public class ServerConnectionDelegate extends ServerDelegate { return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); } + + @Override public void sessionDetach(Connection conn, SessionDetach dtc) + { + // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures + // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister + // completes. + unregisterAllSubscriptions(conn, dtc); + super.sessionDetach(conn, dtc); + } + + private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc) + { + final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); + final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subs) + { + ssn.unregister(subscription_0_10); + } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index eef6c047d3..82a6cdaa67 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -35,7 +35,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; @@ -406,7 +405,7 @@ public class Connection extends ConnectionInvoker else { throw new ProtocolViolationException( - "Received frames for an already dettached session", null); + "Received frames for an already detached session", null); } } @@ -455,7 +454,7 @@ public class Connection extends ConnectionInvoker } } - protected Session getSession(int channel) + public Session getSession(int channel) { synchronized (lock) { |