summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java97
1 files changed, 84 insertions, 13 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 53c36d9718..7c6db0b4b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -36,7 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -154,10 +155,9 @@ public class AMQQueue implements Managable, Comparable
/** total messages received by the queue since startup. */
public AtomicLong _totalMessagesReceived = new AtomicLong();
- public int compareTo(Object o)
- {
- return _name.compareTo(((AMQQueue) o).getName());
- }
+
+ private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -200,6 +200,13 @@ public class AMQQueue implements Managable, Comparable
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+
+ // This ensure that the notification checks for the configured alerts are created.
+ setMaximumMessageAge(_maximumMessageAge);
+ setMaximumMessageCount(_maximumMessageCount);
+ setMaximumMessageSize(_maximumMessageSize);
+ setMaximumQueueDepth(_maximumQueueDepth);
+
}
private AMQQueueMBean createMBean() throws AMQException
@@ -214,7 +221,7 @@ public class AMQQueue implements Managable, Comparable
}
}
- public AMQShortString getName()
+ public final AMQShortString getName()
{
return _name;
}
@@ -540,9 +547,17 @@ public class AMQQueue implements Managable, Comparable
return _maximumMessageSize;
}
- public void setMaximumMessageSize(long value)
+ public void setMaximumMessageSize(final long maximumMessageSize)
{
- _maximumMessageSize = value;
+ _maximumMessageSize = maximumMessageSize;
+ if(maximumMessageSize == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
}
public int getConsumerCount()
@@ -565,9 +580,20 @@ public class AMQQueue implements Managable, Comparable
return _maximumMessageCount;
}
- public void setMaximumMessageCount(long value)
+ public void setMaximumMessageCount(final long maximumMessageCount)
{
- _maximumMessageCount = value;
+ _maximumMessageCount = maximumMessageCount;
+ if(maximumMessageCount == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+
+
+
}
public long getMaximumQueueDepth()
@@ -576,9 +602,18 @@ public class AMQQueue implements Managable, Comparable
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(long value)
+ public void setMaximumQueueDepth(final long maximumQueueDepth)
{
- _maximumQueueDepth = value;
+ _maximumQueueDepth = maximumQueueDepth;
+ if(maximumQueueDepth == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+
}
public long getOldestMessageArrivalTime()
@@ -661,6 +696,12 @@ public class AMQQueue implements Managable, Comparable
}
_subscribers.addSubscriber(subscription);
+ if(exclusive)
+ {
+ _subscribers.setExclusive(true);
+ }
+
+ subscription.start();
}
private boolean isExclusive()
@@ -692,6 +733,7 @@ public class AMQQueue implements Managable, Comparable
ps, channel, consumerTag, this));
}
+ _subscribers.setExclusive(false);
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
consumerTag)))
@@ -805,7 +847,7 @@ public class AMQQueue implements Managable, Comparable
public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
{
AMQMessage msg = entry.getMessage();
- _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
+ _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -938,6 +980,14 @@ public class AMQQueue implements Managable, Comparable
public void setMaximumMessageAge(long maximumMessageAge)
{
_maximumMessageAge = maximumMessageAge;
+ if(maximumMessageAge == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
}
public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
@@ -950,4 +1000,25 @@ public class AMQQueue implements Managable, Comparable
return new QueueEntry(this, amqMessage);
}
+ public int compareTo(Object o)
+ {
+ return _name.compareTo(((AMQQueue) o).getName());
+ }
+
+
+ public void removeExpiredIfNoSubscribers() throws AMQException
+ {
+ synchronized(_subscribers.getChangeLock())
+ {
+ if(_subscribers.isEmpty())
+ {
+ _deliveryMgr.removeExpired();
+ }
+ }
+ }
+
+ public final Set<NotificationCheck> getNotificationChecks()
+ {
+ return _notificationChecks;
+ }
}