summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java55
1 files changed, 51 insertions, 4 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index add5e64ee8..180f0a992c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -783,16 +783,31 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
-
+ /**
+ * Called from the ChannelFlowHandler to suspend this Channel
+ * @param suspended boolean, should this Channel be suspended
+ */
public void setSuspended(boolean suspended)
{
-
-
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
- _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+ // Log Flow Started before we start the subscriptions
+ if (!suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Started"));
+ }
+ // This section takes two different approaches to perform to perform
+ // the same function. Ensuring that the Subscription has taken note
+ // of the change in Channel State
+
+ // Here we have become unsuspended and so we ask each the queue to
+ // perform an Async delivery for each of the subscriptions in this
+ // Channel. The alternative would be to ensure that the subscription
+ // had received the change in suspension state. That way the logic
+ // behind decieding to start an async delivery was located with the
+ // Subscription.
if (wasSuspended)
{
// may need to deliver queued messages
@@ -801,6 +816,38 @@ public class AMQChannel
s.getQueue().deliverAsync(s);
}
}
+
+
+ // Here we have become suspended so we need to ensure that each of
+ // the Subscriptions has noticed this change so that we can be sure
+ // they are not still sending messages. Again the code here is a
+ // very simplistic approach to ensure that the change of suspension
+ // has been noticed by each of the Subscriptions. Unlike the above
+ // case we don't actually need to do anything else.
+ if (!wasSuspended)
+ {
+ // may need to deliver queued messages
+ for (Subscription s : _tag2SubscriptionMap.values())
+ {
+ try
+ {
+ s.getSendLock();
+ }
+ finally
+ {
+ s.releaseSendLock();
+ }
+ }
+ }
+
+
+ // Log Suspension only after we have confirmed all suspensions are
+ // stopped.
+ if (suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Stopped"));
+ }
+
}
}