diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java | 97 |
1 files changed, 84 insertions, 13 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 53c36d9718..7c6db0b4b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -36,7 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; -import java.util.List; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -154,10 +155,9 @@ public class AMQQueue implements Managable, Comparable /** total messages received by the queue since startup. */ public AtomicLong _totalMessagesReceived = new AtomicLong(); - public int compareTo(Object o) - { - return _name.compareTo(((AMQQueue) o).getName()); - } + + private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -200,6 +200,13 @@ public class AMQQueue implements Managable, Comparable _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + + // This ensure that the notification checks for the configured alerts are created. + setMaximumMessageAge(_maximumMessageAge); + setMaximumMessageCount(_maximumMessageCount); + setMaximumMessageSize(_maximumMessageSize); + setMaximumQueueDepth(_maximumQueueDepth); + } private AMQQueueMBean createMBean() throws AMQException @@ -214,7 +221,7 @@ public class AMQQueue implements Managable, Comparable } } - public AMQShortString getName() + public final AMQShortString getName() { return _name; } @@ -540,9 +547,17 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageSize; } - public void setMaximumMessageSize(long value) + public void setMaximumMessageSize(final long maximumMessageSize) { - _maximumMessageSize = value; + _maximumMessageSize = maximumMessageSize; + if(maximumMessageSize == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); + } } public int getConsumerCount() @@ -565,9 +580,20 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageCount; } - public void setMaximumMessageCount(long value) + public void setMaximumMessageCount(final long maximumMessageCount) { - _maximumMessageCount = value; + _maximumMessageCount = maximumMessageCount; + if(maximumMessageCount == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); + } + + + } public long getMaximumQueueDepth() @@ -576,9 +602,18 @@ public class AMQQueue implements Managable, Comparable } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(long value) + public void setMaximumQueueDepth(final long maximumQueueDepth) { - _maximumQueueDepth = value; + _maximumQueueDepth = maximumQueueDepth; + if(maximumQueueDepth == 0L) + { + _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); + } + } public long getOldestMessageArrivalTime() @@ -661,6 +696,12 @@ public class AMQQueue implements Managable, Comparable } _subscribers.addSubscriber(subscription); + if(exclusive) + { + _subscribers.setExclusive(true); + } + + subscription.start(); } private boolean isExclusive() @@ -692,6 +733,7 @@ public class AMQQueue implements Managable, Comparable ps, channel, consumerTag, this)); } + _subscribers.setExclusive(false); Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) @@ -805,7 +847,7 @@ public class AMQQueue implements Managable, Comparable public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException { AMQMessage msg = entry.getMessage(); - _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst); + _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst); try { msg.checkDeliveredToConsumer(); @@ -938,6 +980,14 @@ public class AMQQueue implements Managable, Comparable public void setMaximumMessageAge(long maximumMessageAge) { _maximumMessageAge = maximumMessageAge; + if(maximumMessageAge == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); + } } public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry) @@ -950,4 +1000,25 @@ public class AMQQueue implements Managable, Comparable return new QueueEntry(this, amqMessage); } + public int compareTo(Object o) + { + return _name.compareTo(((AMQQueue) o).getName()); + } + + + public void removeExpiredIfNoSubscribers() throws AMQException + { + synchronized(_subscribers.getChangeLock()) + { + if(_subscribers.isEmpty()) + { + _deliveryMgr.removeExpired(); + } + } + } + + public final Set<NotificationCheck> getNotificationChecks() + { + return _notificationChecks; + } } |