diff options
author | Robert Greig <rgreig@apache.org> | 2007-02-05 09:40:04 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-02-05 09:40:04 +0000 |
commit | 8949f938b39c2a2235f31bb2035174eedc9ba7b7 (patch) | |
tree | 6619a2926e7946124513d2dee0f3b79ae19a4df2 | |
parent | 3da36fe4417019b417217e8feb22b8de11cbed0a (diff) | |
download | qpid-python-8949f938b39c2a2235f31bb2035174eedc9ba7b7.tar.gz |
QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue notification, and log notifications in log file
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@503604 13f79535-47bb-0310-9956-ffa450edef68
15 files changed, 482 insertions, 173 deletions
diff --git a/java/broker/etc/virtualhosts.xml b/java/broker/etc/virtualhosts.xml index 50cddb5661..2a573535de 100644 --- a/java/broker/etc/virtualhosts.xml +++ b/java/broker/etc/virtualhosts.xml @@ -21,18 +21,79 @@ --> <virtualhosts> <virtualhost> - <path>localhost</path> - <bind>direct://amq.direct//queue</bind> - <bind>direct://amq.direct//ping</bind> + <name>localhost</name> + + <localhost> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </localhost> </virtualhost> <virtualhost> - <path>development</path> - <bind>direct://amq.direct//queue</bind> - <bind>direct://amq.direct//ping</bind> + <name>development</name> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <development> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </development> </virtualhost> <virtualhost> - <path>test</path> - <bind>direct://amq.direct//queue</bind> - <bind>direct://amq.direct//ping</bind> + <name>test</name> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <test> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </test> </virtualhost> </virtualhosts> diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 361a21b284..c191bef447 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -34,9 +34,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.log4j.Logger; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.CompositeConfiguration; import java.util.Collection; +import java.util.List; +import java.util.Collections; public class VirtualHostConfiguration { @@ -44,11 +48,7 @@ public class VirtualHostConfiguration XMLConfiguration _config; - private static final String XML_VIRTUALHOST = "virtualhost"; - private static final String XML_PATH = "path"; - private static final String XML_BIND = "bind"; - private static final String XML_VIRTUALHOST_PATH = "virtualhost.path"; - private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind"; + private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost."; public VirtualHostConfiguration(String configFile) throws ConfigurationException @@ -57,137 +57,66 @@ public class VirtualHostConfiguration _config = new XMLConfiguration(configFile); - if (_config.getProperty(XML_VIRTUALHOST_PATH) == null) - { - throw new ConfigurationException( - "Virtualhost Configuration document does not contain a valid virtualhost."); - } } - public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException - { - Object prop = _config.getProperty(XML_VIRTUALHOST_PATH); - - if (prop instanceof Collection) - { - _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size()); - int virtualhosts = ((Collection) prop).size(); - for (int vhost = 0; vhost < virtualhosts; vhost++) - { - loadVirtualHost(vhost); - } - } - else - { - loadVirtualHost(-1); - } - } - private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException + private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException { - String path = XML_VIRTUALHOST; - - if (index != -1) - { - path = path + "(" + index + ")"; - } - - Object prop = _config.getProperty(path + "." + XML_PATH); - - if (prop == null) - { - prop = _config.getProperty(path + "." + XML_BIND); - String error = "Virtual Host not defined for binding"; - - if (prop != null) - { - if (prop instanceof Collection) - { - error += "s"; - } + _logger.debug("Loding configuration for virtaulhost: "+virtualHostName); - error += ": " + prop; - } - throw new ConfigurationException(error); - } + VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName); - _logger.info("VirtualHost:'" + prop + "'"); - String virtualHost = prop.toString(); - prop = _config.getProperty(path + "." + XML_BIND); - if (prop instanceof Collection) - { - int bindings = ((Collection) prop).size(); - _logger.debug("Number of Bindings: " + bindings); - for (int dest = 0; dest < bindings; dest++) - { - loadBinding(virtualHost, path, dest); - } - } - else - { - loadBinding(virtualHost,path, -1); - } - } - private void loadBinding(String virtualHost, String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException - { - String path = rootpath + "." + XML_BIND; - if (index != -1) + if(virtualHost == null) { - path = path + "(" + index + ")"; + throw new ConfigurationException("Unknown virtual host: " + virtualHostName); } - String bindingString = _config.getString(path); - - AMQBindingURL binding = new AMQBindingURL(bindingString); + List queueNames = configuration.getList("queue.name"); - _logger.debug("Loaded Binding:" + binding); - - try - { - bind(virtualHost, binding); - } - catch (AMQException amqe) + for(Object queueNameObj : queueNames) { - _logger.info("Unable to bind url: " + binding); - throw amqe; + String queueName = String.valueOf(queueNameObj); + configureQueue(virtualHost, queueName, configuration); } + } - private void bind(String virtualHostName, AMQBindingURL binding) throws AMQException, ConfigurationException + private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException { + CompositeConfiguration queueConfiguration = new CompositeConfiguration(); - AMQShortString queueName = binding.getQueueName(); + queueConfiguration.addConfiguration(configuration.subset("queue."+ queueNameString)); + queueConfiguration.addConfiguration(configuration); - // This will occur if the URL is a Topic - if (queueName == null) - { - //todo register valid topic - ///queueName = binding.getDestinationName(); - throw new AMQException("Topics cannot be bound. TODO Register valid topic"); - } - - //Get references to Broker Registries - VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); MessageStore messageStore = virtualHost.getMessageStore(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + + AMQShortString queueName = new AMQShortString(queueNameString); + + AMQQueue queue; + synchronized (queueRegistry) { - AMQQueue queue = queueRegistry.getQueue(queueName); + queue = queueRegistry.getQueue(queueName); if (queue == null) { - _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating."); + _logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName()); + + boolean durable = queueConfiguration.getBoolean("durable" ,false); + boolean autodelete = queueConfiguration.getBoolean("autodelete", false); + String owner = queueConfiguration.getString("owner", null); queue = new AMQQueue(queueName, - Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)), - null /* These queues will have no owner */, - false /* Therefore autodelete makes no sence */, virtualHost); + durable, + owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */, + autodelete /* Therefore autodelete makes no sence */, virtualHost); if (queue.isDurable()) { @@ -198,27 +127,69 @@ public class VirtualHostConfiguration } else { - _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating."); + _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating."); } - Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName()); - synchronized (defaultExchange) + String exchangeName = queueConfiguration.getString("exchange", null); + + Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); + + if(exchange == null) { - if (defaultExchange == null) - { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding); - } + exchange = virtualHost.getExchangeRegistry().getDefaultExchange(); + } - defaultExchange.registerQueue(queue.getName(), queue, null); + if (exchange == null) + { + throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); + } - if (binding.getRoutingKey() == null || binding.getRoutingKey().equals("")) + synchronized (exchange) + { + List routingKeys = queueConfiguration.getList("routingKey"); + if(routingKeys == null || routingKeys.isEmpty()) { - throw new ConfigurationException("Unknown binding not specified on url:" + binding); + routingKeys = Collections.singletonList(queue.getName()); } - queue.bind(binding.getRoutingKey(), defaultExchange); + for(Object routingKeyNameObj : routingKeys) + { + AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); + exchange.registerQueue(routingKey, queue, null); + + queue.bind(routingKey, exchange); + + _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); + } } - _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'"); + } + + + + Configurator.configure(queue, queueConfiguration); } + + + public void performBindings() throws AMQException, ConfigurationException + { + List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name"); + + _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames); + + for(Object nameObject : virtualHostNames) + { + String name = String.valueOf(nameObject); + configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name)); + } + + if (virtualHostNames == null || virtualHostNames.isEmpty()) + { + throw new ConfigurationException( + "Virtualhost Configuration document does not contain a valid virtualhost."); + } + } + + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 374772bc4a..cb3b1eafa6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -67,6 +67,11 @@ public class DefaultExchangeRegistry implements ExchangeRegistry _defaultExchange = exchange; } + public Exchange getDefaultExchange() + { + return _defaultExchange; + } + public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException { // TODO: check inUse argument diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index 24884d20d7..a022b86299 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -40,4 +40,6 @@ public interface ExchangeRegistry extends MessageRouter Exchange getExchange(AMQShortString name); void setDefaultExchange(Exchange exchange); + + Exchange getDefaultExchange(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 7a16901796..71fecc45fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -509,6 +509,12 @@ public class AMQMessage _messageHandle.setRedelivered(redelivered); } + public long getArrivalTime() + { + return _messageHandle.getArrivalTime(); + } + + /** * Called when this message is delivered to a consumer. (used to * implement the 'immediate' flag functionality). diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index d788d1b9e2..f07706c2e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -74,4 +74,6 @@ public interface AMQMessageHandle void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException; void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException; + + long getArrivalTime(); }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index ab7bbefe92..aa372a3b99 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; @@ -35,7 +36,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; import java.util.List; -import java.util.ArrayList; import java.util.concurrent.Executor; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -130,22 +130,36 @@ public class AMQQueue implements Managable, Comparable /** * max allowed size(KB) of a single message */ - private long _maxMessageSize = 10000; + private long _maximumMessageSize = 10000; /** * max allowed number of messages on a queue. */ - private Integer _maxMessageCount = 10000; + @Configured(path = "maximumMessageCount", defaultValue = "0") + public int _maximumMessageCount; /** - * max queue depth(KB) for the queue + * max queue depth for the queue */ - private long _maxQueueDepth = 10000000; + @Configured(path = "maximumQueueDepth", defaultValue = "0") + public long _maximumQueueDepth = 10000000; + +/* + * maximum message age before alerts occur + */ + @Configured(path = "maximumMessageAge", defaultValue = "0") + public long _maximumMessageAge = 30000; //0 + + /* + * the minimum interval between sending out consequetive alerts of the same type + */ + @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") + public long _minimumAlertRepeatGap = 30000; /** * total messages received by the queue since startup. */ - private long _totalMessagesReceived = 0; + public long _totalMessagesReceived = 0; public int compareTo(Object o) { @@ -286,50 +300,56 @@ public class AMQQueue implements Managable, Comparable return _managedObject; } - public Long getMaximumMessageSize() + public long getMaximumMessageSize() { - return _maxMessageSize; + return _maximumMessageSize; } - public void setMaximumMessageSize(Long value) + public void setMaximumMessageSize(long value) { - _maxMessageSize = value; + _maximumMessageSize = value; } - public Integer getConsumerCount() + public int getConsumerCount() { return _subscribers.size(); } - public Integer getActiveConsumerCount() + public int getActiveConsumerCount() { return _subscribers.getWeight(); } - public Long getReceivedMessageCount() + public long getReceivedMessageCount() { return _totalMessagesReceived; } - public Integer getMaximumMessageCount() + public int getMaximumMessageCount() { - return _maxMessageCount; + return _maximumMessageCount; } - public void setMaximumMessageCount(Integer value) + public void setMaximumMessageCount(int value) { - _maxMessageCount = value; + _maximumMessageCount = value; } public long getMaximumQueueDepth() { - return _maxQueueDepth; + return _maximumQueueDepth; } // Sets the queue depth, the max queue size public void setMaximumQueueDepth(long value) { - _maxQueueDepth = value; + _maximumQueueDepth = value; + } + + public long getOldestMessageArrivalTime() + { + return _deliveryMgr.getOldestMessageArrival(); + } /** @@ -631,6 +651,24 @@ public class AMQQueue implements Managable, Comparable _deleteTaskList.add(task); } + public long getMinimumAlertRepeatGap() + { + return _minimumAlertRepeatGap; + } + + public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) + { + _minimumAlertRepeatGap = minimumAlertRepeatGap; + } + public long getMaximumMessageAge() + { + return _maximumMessageAge; + } + + public void setMaximumMessageAge(long maximumMessageAge) + { + _maximumMessageAge = maximumMessageAge; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index e15dc648f7..bee2e3950c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -22,12 +22,12 @@ import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.Main; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.mina.common.ByteBuffer; +import org.apache.log4j.Logger; import javax.management.openmbean.*; import javax.management.*; @@ -41,8 +41,11 @@ import java.util.Iterator; * for an AMQQueue. */ @MBeanDescription("Management Interface for AMQQueue") -public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue +public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { + + private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); + /** * Since the MBean is not associated with a real channel we can safely create our own store context * for use in the few methods that require one. @@ -63,6 +66,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"}; private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; + + private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; + @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException { @@ -213,38 +219,38 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue return msg.getContentHeaderBody().bodySize; } + + /** * Checks if there is any notification to be send to the listeners */ public void checkForNotification(AMQMessage msg) throws AMQException, JMException { - // Check for threshold message count - Integer msgCount = getMessageCount(); - if (msgCount >= getMaximumMessageCount()) - { - notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value"); - } - // Check for threshold message size - long messageSize = getMessageSize(msg); - if (messageSize >= _queue.getMaximumMessageSize()) - { - notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value"); - } + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); - // Check for threshold queue depth in bytes - long queueDepth = getQueueDepthKb(); - if (queueDepth >= _queue.getMaximumQueueDepth()) + for(NotificationCheck check : NotificationCheck.values()) { - notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); + if(check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()]<thresholdTime) + { + if(check.notifyIfNecessary(msg, _queue, this)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } + } } + } /** * Sends the notification to the listeners */ - private void notifyClients(String notificationMsg) + public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg) { + // important : add log to the log file - monitoring tools may be looking for this + _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg); + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 2103d79310..3a9ce64c57 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -160,6 +160,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return _totalMessageSize.get(); } + public long getOldestMessageArrival() + { + AMQMessage msg = _messages.peek(); + return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); + } + public synchronized List<AMQMessage> getMessages() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 7d7ede0732..f7820e1465 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -83,4 +83,6 @@ interface DeliveryManager boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException; long getTotalMessageSize(); + + long getOldestMessageArrival(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 186a5c8753..4f7dc21598 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -43,6 +43,8 @@ public class InMemoryMessageHandle implements AMQMessageHandle private boolean _redelivered; + private long _arrivalTime; + public InMemoryMessageHandle() { } @@ -114,6 +116,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle { _publishBody = publishBody; _contentHeaderBody = contentHeaderBody; + _arrivalTime = System.currentTimeMillis(); } public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException @@ -130,4 +133,10 @@ public class InMemoryMessageHandle implements AMQMessageHandle { // NO OP } + + public long getArrivalTime() + { + return _arrivalTime; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java index deed18c188..a66a85e54d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java @@ -31,11 +31,19 @@ public class MessageMetaData private int _contentChunkCount; + private long _arrivalTime; + public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount) { + this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis()); + } + + public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime) + { _contentHeaderBody = contentHeaderBody; _publishBody = publishBody; _contentChunkCount = contentChunkCount; + _arrivalTime = arrivalTime; } public int getContentChunkCount() @@ -67,4 +75,14 @@ public class MessageMetaData { _publishBody = publishBody; } + + public long getArrivalTime() + { + return _arrivalTime; + } + + public void setArrivalTime(long arrivalTime) + { + _arrivalTime = arrivalTime; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java new file mode 100644 index 0000000000..bc8e1232a7 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -0,0 +1,135 @@ +/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public enum NotificationCheck
+{
+
+ MESSAGE_COUNT_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ int msgCount = queue.getMessageCount();
+ final int maximumMessageCount = queue.getMaximumMessageCount();
+ if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ {
+ listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
+ return true;
+ }
+ return false;
+ }
+ },
+ MESSAGE_SIZE_ALERT(true)
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ final long maximumMessageSize = queue.getMaximumMessageSize();
+ if(maximumMessageSize != 0)
+ {
+ // Check for threshold message size
+ long messageSize;
+ try
+ {
+ messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+ }
+ catch (AMQException e)
+ {
+ messageSize = 0;
+ }
+
+
+ if (messageSize >= maximumMessageSize)
+ {
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ QUEUE_DEPTH_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ // Check for threshold queue depth in bytes
+ final long maximumQueueDepth = queue.getMaximumQueueDepth();
+
+ if(maximumQueueDepth != 0)
+ {
+ final long queueDepth = queue.getQueueDepth();
+
+ if (queueDepth >= maximumQueueDepth)
+ {
+ listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ MESSAGE_AGE_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+
+ final long maxMessageAge = queue.getMaximumMessageAge();
+ if(maxMessageAge != 0)
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - maxMessageAge;
+ final long firstArrivalTime = queue.getOldestMessageArrivalTime();
+
+ if(firstArrivalTime < thresholdTime)
+ {
+ long oldestAge = currentTime - firstArrivalTime;
+ listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ }
+ ;
+
+ private final boolean _messageSpecific;
+
+ NotificationCheck()
+ {
+ this(false);
+ }
+
+ NotificationCheck(boolean messageSpecific)
+ {
+ _messageSpecific = messageSpecific;
+ }
+
+ public boolean isMessageSpecific()
+ {
+ return _messageSpecific;
+ }
+
+ abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java new file mode 100644 index 0000000000..9554d34f00 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java @@ -0,0 +1,23 @@ +/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface QueueNotificationListener
+{
+ void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 0abf464d15..8675a7249a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -49,6 +49,8 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle private final MessageStore _messageStore; + private long _arrivalTime; + public WeakReferenceMessageHandle(MessageStore messageStore) { @@ -60,14 +62,27 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null); if (chb == null) { - MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + MessageMetaData mmd = loadMessageMetaData(messageId); chb = mmd.getContentHeaderBody(); - _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb); - _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody()); } return chb; } + private MessageMetaData loadMessageMetaData(Long messageId) + throws AMQException + { + MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + populateFromMessageMetaData(mmd); + return mmd; + } + + private void populateFromMessageMetaData(MessageMetaData mmd) + { + _arrivalTime = mmd.getArrivalTime(); + _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody()); + _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody()); + } + public int getBodyCount(Long messageId) throws AMQException { if (_contentBodies == null) @@ -136,10 +151,9 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null); if (bpb == null) { - MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + MessageMetaData mmd = loadMessageMetaData(messageId); + bpb = mmd.getPublishBody(); - _publishBody = new WeakReference<BasicPublishBody>(bpb); - _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody()); } return bpb; } @@ -179,10 +193,15 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle { _contentBodies = new LinkedList<WeakReference<ContentBody>>(); } - _messageStore.storeMessageMetaData(storeContext, messageId, new MessageMetaData(publishBody, contentHeaderBody, - _contentBodies.size())); - _publishBody = new WeakReference<BasicPublishBody>(publishBody); - _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody); + + final long arrivalTime = System.currentTimeMillis(); + + + MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime); + + _messageStore.storeMessageMetaData(storeContext, messageId, mmd); + + populateFromMessageMetaData(mmd); } public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException @@ -199,4 +218,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle { _messageStore.dequeueMessage(storeContext, queue.getName(), messageId); } + + public long getArrivalTime() + { + return _arrivalTime; + } + } |