diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java | 119 |
1 files changed, 72 insertions, 47 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 101a2833a0..d8bacc8c7d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; @@ -90,22 +91,36 @@ public class AMQQueue implements Managable, Comparable /** * max allowed size(KB) of a single message */ - private long _maxMessageSize = 10000; + private long _maximumMessageSize = 10000; /** * max allowed number of messages on a queue. */ - private Integer _maxMessageCount = 10000; + @Configured(path = "maximumMessageCount", defaultValue = "0") + public int _maximumMessageCount; /** - * max queue depth(KB) for the queue + * max queue depth for the queue */ - private long _maxQueueDepth = 10000000; + @Configured(path = "maximumQueueDepth", defaultValue = "0") + public long _maximumQueueDepth = 10000000; + + /* + * maximum message age before alerts occur + */ + @Configured(path = "maximumMessageAge", defaultValue = "0") + public long _maximumMessageAge = 30000; //0 + + /* + * the minimum interval between sending out consequetive alerts of the same type + */ + @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") + public long _minimumAlertRepeatGap = 30000; /** * total messages received by the queue since startup. */ - private long _totalMessagesReceived = 0; + public long _totalMessagesReceived = 0; public int compareTo(Object o) { @@ -183,35 +198,13 @@ public class AMQQueue implements Managable, Comparable _autoDelete = autoDelete; _queueRegistry = queueRegistry; _asyncDelivery = asyncDelivery; + _managedObject = createMBean(); _managedObject.register(); + _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; - - //fixme - Make this configurable via the broker config.xml - if (System.getProperties().getProperty("deliverymanager") != null) - { - if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager")) - { - _logger.info("Using ConcurrentSelectorDeliveryManager"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - } - else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager")) - { - _logger.info("Using ConcurrentDeliveryManager"); - _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); - } - else - { - _logger.info("Using SynchronizedDeliveryManager"); - _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); - } - } - else - { - _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - } + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); } private AMQQueueMBean createMBean() throws AMQException @@ -267,6 +260,11 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.getMessages(); } + public long getQueueDepth() + { + return _deliveryMgr.getTotalMessageSize(); + } + /** * @param messageId * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. @@ -295,50 +293,55 @@ public class AMQQueue implements Managable, Comparable return _managedObject; } - public Long getMaximumMessageSize() + public long getMaximumMessageSize() { - return _maxMessageSize; + return _maximumMessageSize; } - public void setMaximumMessageSize(Long value) + public void setMaximumMessageSize(long value) { - _maxMessageSize = value; + _maximumMessageSize = value; } - public Integer getConsumerCount() + public int getConsumerCount() { return _subscribers.size(); } - public Integer getActiveConsumerCount() + public int getActiveConsumerCount() { return _subscribers.getWeight(); } - public Long getReceivedMessageCount() + public long getReceivedMessageCount() { return _totalMessagesReceived; } - public Integer getMaximumMessageCount() + public int getMaximumMessageCount() { - return _maxMessageCount; + return _maximumMessageCount; } - public void setMaximumMessageCount(Integer value) + public void setMaximumMessageCount(int value) { - _maxMessageCount = value; + _maximumMessageCount = value; } - public Long getMaximumQueueDepth() + public long getMaximumQueueDepth() { - return _maxQueueDepth; + return _maximumQueueDepth; } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(Long value) + public void setMaximumQueueDepth(long value) { - _maxQueueDepth = value; + _maximumQueueDepth = value; + } + + public long getOldestMessageArrivalTime() + { + return _deliveryMgr.getOldestMessageArrival(); } /** @@ -374,11 +377,11 @@ public class AMQQueue implements Managable, Comparable Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); - if(subscription.hasFilters()) + if (subscription.hasFilters()) { if (_deliveryMgr.hasQueuedMessages()) { - _deliveryMgr.populatePreDeliveryQueue(subscription); + _deliveryMgr.populatePreDeliveryQueue(subscription); } } @@ -551,6 +554,27 @@ public class AMQQueue implements Managable, Comparable } } + public long getMinimumAlertRepeatGap() + { + return _minimumAlertRepeatGap; + } + + public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) + { + _minimumAlertRepeatGap = minimumAlertRepeatGap; + } + + public long getMaximumMessageAge() + { + return _maximumMessageAge; + } + + public void setMaximumMessageAge(long maximumMessageAge) + { + _maximumMessageAge = maximumMessageAge; + } + + private class Deliver implements TxnOp { private final AMQMessage _msg; @@ -591,4 +615,5 @@ public class AMQQueue implements Managable, Comparable } } + } |