summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-09-30 15:54:47 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-09-30 15:54:47 +0000
commitbd6fe63ebc1943d28e9267032d49eb38242c9a37 (patch)
tree01a976bd3c77336ed32ab010e4f810d16333d9f6 /java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
parentad491c7e599ac276a058674ef2bbb3bf1875b010 (diff)
downloadqpid-python-bd6fe63ebc1943d28e9267032d49eb38242c9a37.tar.gz
QPID-2116 : Ensure that AMQChannel correctly notifies all running Subscription deliveries that the Channel has been suspended. This is down by taking out the subcription sendLock on each subscription on this Channel.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@820316 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java55
1 files changed, 51 insertions, 4 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index add5e64ee8..180f0a992c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -783,16 +783,31 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
-
+ /**
+ * Called from the ChannelFlowHandler to suspend this Channel
+ * @param suspended boolean, should this Channel be suspended
+ */
public void setSuspended(boolean suspended)
{
-
-
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
- _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+ // Log Flow Started before we start the subscriptions
+ if (!suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Started"));
+ }
+ // This section takes two different approaches to perform to perform
+ // the same function. Ensuring that the Subscription has taken note
+ // of the change in Channel State
+
+ // Here we have become unsuspended and so we ask each the queue to
+ // perform an Async delivery for each of the subscriptions in this
+ // Channel. The alternative would be to ensure that the subscription
+ // had received the change in suspension state. That way the logic
+ // behind decieding to start an async delivery was located with the
+ // Subscription.
if (wasSuspended)
{
// may need to deliver queued messages
@@ -801,6 +816,38 @@ public class AMQChannel
s.getQueue().deliverAsync(s);
}
}
+
+
+ // Here we have become suspended so we need to ensure that each of
+ // the Subscriptions has noticed this change so that we can be sure
+ // they are not still sending messages. Again the code here is a
+ // very simplistic approach to ensure that the change of suspension
+ // has been noticed by each of the Subscriptions. Unlike the above
+ // case we don't actually need to do anything else.
+ if (!wasSuspended)
+ {
+ // may need to deliver queued messages
+ for (Subscription s : _tag2SubscriptionMap.values())
+ {
+ try
+ {
+ s.getSendLock();
+ }
+ finally
+ {
+ s.releaseSendLock();
+ }
+ }
+ }
+
+
+ // Log Suspension only after we have confirmed all suspensions are
+ // stopped.
+ if (suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Stopped"));
+ }
+
}
}