summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java119
1 files changed, 72 insertions, 47 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 101a2833a0..d8bacc8c7d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
@@ -90,22 +91,36 @@ public class AMQQueue implements Managable, Comparable
/**
* max allowed size(KB) of a single message
*/
- private long _maxMessageSize = 10000;
+ private long _maximumMessageSize = 10000;
/**
* max allowed number of messages on a queue.
*/
- private Integer _maxMessageCount = 10000;
+ @Configured(path = "maximumMessageCount", defaultValue = "0")
+ public int _maximumMessageCount;
/**
- * max queue depth(KB) for the queue
+ * max queue depth for the queue
*/
- private long _maxQueueDepth = 10000000;
+ @Configured(path = "maximumQueueDepth", defaultValue = "0")
+ public long _maximumQueueDepth = 10000000;
+
+ /*
+ * maximum message age before alerts occur
+ */
+ @Configured(path = "maximumMessageAge", defaultValue = "0")
+ public long _maximumMessageAge = 30000; //0
+
+ /*
+ * the minimum interval between sending out consequetive alerts of the same type
+ */
+ @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
+ public long _minimumAlertRepeatGap = 30000;
/**
* total messages received by the queue since startup.
*/
- private long _totalMessagesReceived = 0;
+ public long _totalMessagesReceived = 0;
public int compareTo(Object o)
{
@@ -183,35 +198,13 @@ public class AMQQueue implements Managable, Comparable
_autoDelete = autoDelete;
_queueRegistry = queueRegistry;
_asyncDelivery = asyncDelivery;
+
_managedObject = createMBean();
_managedObject.register();
+
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
-
- //fixme - Make this configurable via the broker config.xml
- if (System.getProperties().getProperty("deliverymanager") != null)
- {
- if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
- {
- _logger.info("Using ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
- else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
- {
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
- }
- else
- {
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
- }
- }
- else
- {
- _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
private AMQQueueMBean createMBean() throws AMQException
@@ -267,6 +260,11 @@ public class AMQQueue implements Managable, Comparable
return _deliveryMgr.getMessages();
}
+ public long getQueueDepth()
+ {
+ return _deliveryMgr.getTotalMessageSize();
+ }
+
/**
* @param messageId
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
@@ -295,50 +293,55 @@ public class AMQQueue implements Managable, Comparable
return _managedObject;
}
- public Long getMaximumMessageSize()
+ public long getMaximumMessageSize()
{
- return _maxMessageSize;
+ return _maximumMessageSize;
}
- public void setMaximumMessageSize(Long value)
+ public void setMaximumMessageSize(long value)
{
- _maxMessageSize = value;
+ _maximumMessageSize = value;
}
- public Integer getConsumerCount()
+ public int getConsumerCount()
{
return _subscribers.size();
}
- public Integer getActiveConsumerCount()
+ public int getActiveConsumerCount()
{
return _subscribers.getWeight();
}
- public Long getReceivedMessageCount()
+ public long getReceivedMessageCount()
{
return _totalMessagesReceived;
}
- public Integer getMaximumMessageCount()
+ public int getMaximumMessageCount()
{
- return _maxMessageCount;
+ return _maximumMessageCount;
}
- public void setMaximumMessageCount(Integer value)
+ public void setMaximumMessageCount(int value)
{
- _maxMessageCount = value;
+ _maximumMessageCount = value;
}
- public Long getMaximumQueueDepth()
+ public long getMaximumQueueDepth()
{
- return _maxQueueDepth;
+ return _maximumQueueDepth;
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(Long value)
+ public void setMaximumQueueDepth(long value)
{
- _maxQueueDepth = value;
+ _maximumQueueDepth = value;
+ }
+
+ public long getOldestMessageArrivalTime()
+ {
+ return _deliveryMgr.getOldestMessageArrival();
}
/**
@@ -374,11 +377,11 @@ public class AMQQueue implements Managable, Comparable
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
- if(subscription.hasFilters())
+ if (subscription.hasFilters())
{
if (_deliveryMgr.hasQueuedMessages())
{
- _deliveryMgr.populatePreDeliveryQueue(subscription);
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
}
}
@@ -551,6 +554,27 @@ public class AMQQueue implements Managable, Comparable
}
}
+ public long getMinimumAlertRepeatGap()
+ {
+ return _minimumAlertRepeatGap;
+ }
+
+ public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+ {
+ _minimumAlertRepeatGap = minimumAlertRepeatGap;
+ }
+
+ public long getMaximumMessageAge()
+ {
+ return _maximumMessageAge;
+ }
+
+ public void setMaximumMessageAge(long maximumMessageAge)
+ {
+ _maximumMessageAge = maximumMessageAge;
+ }
+
+
private class Deliver implements TxnOp
{
private final AMQMessage _msg;
@@ -591,4 +615,5 @@ public class AMQQueue implements Managable, Comparable
}
}
+
}