summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-02-05 09:40:04 +0000
committerRobert Greig <rgreig@apache.org>2007-02-05 09:40:04 +0000
commit8949f938b39c2a2235f31bb2035174eedc9ba7b7 (patch)
tree6619a2926e7946124513d2dee0f3b79ae19a4df2
parent3da36fe4417019b417217e8feb22b8de11cbed0a (diff)
downloadqpid-python-8949f938b39c2a2235f31bb2035174eedc9ba7b7.tar.gz
QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue notification, and log notifications in log file
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@503604 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/virtualhosts.xml79
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java203
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java76
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java135
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java45
15 files changed, 482 insertions, 173 deletions
diff --git a/java/broker/etc/virtualhosts.xml b/java/broker/etc/virtualhosts.xml
index 50cddb5661..2a573535de 100644
--- a/java/broker/etc/virtualhosts.xml
+++ b/java/broker/etc/virtualhosts.xml
@@ -21,18 +21,79 @@
-->
<virtualhosts>
<virtualhost>
- <path>localhost</path>
- <bind>direct://amq.direct//queue</bind>
- <bind>direct://amq.direct//ping</bind>
+ <name>localhost</name>
+
+ <localhost>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </localhost>
</virtualhost>
<virtualhost>
- <path>development</path>
- <bind>direct://amq.direct//queue</bind>
- <bind>direct://amq.direct//ping</bind>
+ <name>development</name>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <development>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </development>
</virtualhost>
<virtualhost>
- <path>test</path>
- <bind>direct://amq.direct//queue</bind>
- <bind>direct://amq.direct//ping</bind>
+ <name>test</name>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <test>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </test>
</virtualhost>
</virtualhosts>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 361a21b284..c191bef447 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -34,9 +34,13 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.log4j.Logger;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.CompositeConfiguration;
import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
public class VirtualHostConfiguration
{
@@ -44,11 +48,7 @@ public class VirtualHostConfiguration
XMLConfiguration _config;
- private static final String XML_VIRTUALHOST = "virtualhost";
- private static final String XML_PATH = "path";
- private static final String XML_BIND = "bind";
- private static final String XML_VIRTUALHOST_PATH = "virtualhost.path";
- private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind";
+ private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
public VirtualHostConfiguration(String configFile) throws ConfigurationException
@@ -57,137 +57,66 @@ public class VirtualHostConfiguration
_config = new XMLConfiguration(configFile);
- if (_config.getProperty(XML_VIRTUALHOST_PATH) == null)
- {
- throw new ConfigurationException(
- "Virtualhost Configuration document does not contain a valid virtualhost.");
- }
}
- public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException
- {
- Object prop = _config.getProperty(XML_VIRTUALHOST_PATH);
-
- if (prop instanceof Collection)
- {
- _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size());
- int virtualhosts = ((Collection) prop).size();
- for (int vhost = 0; vhost < virtualhosts; vhost++)
- {
- loadVirtualHost(vhost);
- }
- }
- else
- {
- loadVirtualHost(-1);
- }
- }
- private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException
+ private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException
{
- String path = XML_VIRTUALHOST;
-
- if (index != -1)
- {
- path = path + "(" + index + ")";
- }
-
- Object prop = _config.getProperty(path + "." + XML_PATH);
-
- if (prop == null)
- {
- prop = _config.getProperty(path + "." + XML_BIND);
- String error = "Virtual Host not defined for binding";
-
- if (prop != null)
- {
- if (prop instanceof Collection)
- {
- error += "s";
- }
+ _logger.debug("Loding configuration for virtaulhost: "+virtualHostName);
- error += ": " + prop;
- }
- throw new ConfigurationException(error);
- }
+ VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
- _logger.info("VirtualHost:'" + prop + "'");
- String virtualHost = prop.toString();
- prop = _config.getProperty(path + "." + XML_BIND);
- if (prop instanceof Collection)
- {
- int bindings = ((Collection) prop).size();
- _logger.debug("Number of Bindings: " + bindings);
- for (int dest = 0; dest < bindings; dest++)
- {
- loadBinding(virtualHost, path, dest);
- }
- }
- else
- {
- loadBinding(virtualHost,path, -1);
- }
- }
- private void loadBinding(String virtualHost, String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
- {
- String path = rootpath + "." + XML_BIND;
- if (index != -1)
+ if(virtualHost == null)
{
- path = path + "(" + index + ")";
+ throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
}
- String bindingString = _config.getString(path);
-
- AMQBindingURL binding = new AMQBindingURL(bindingString);
+ List queueNames = configuration.getList("queue.name");
- _logger.debug("Loaded Binding:" + binding);
-
- try
- {
- bind(virtualHost, binding);
- }
- catch (AMQException amqe)
+ for(Object queueNameObj : queueNames)
{
- _logger.info("Unable to bind url: " + binding);
- throw amqe;
+ String queueName = String.valueOf(queueNameObj);
+ configureQueue(virtualHost, queueName, configuration);
}
+
}
- private void bind(String virtualHostName, AMQBindingURL binding) throws AMQException, ConfigurationException
+ private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException
{
+ CompositeConfiguration queueConfiguration = new CompositeConfiguration();
- AMQShortString queueName = binding.getQueueName();
+ queueConfiguration.addConfiguration(configuration.subset("queue."+ queueNameString));
+ queueConfiguration.addConfiguration(configuration);
- // This will occur if the URL is a Topic
- if (queueName == null)
- {
- //todo register valid topic
- ///queueName = binding.getDestinationName();
- throw new AMQException("Topics cannot be bound. TODO Register valid topic");
- }
-
- //Get references to Broker Registries
- VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MessageStore messageStore = virtualHost.getMessageStore();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+
+ AMQShortString queueName = new AMQShortString(queueNameString);
+
+ AMQQueue queue;
+
synchronized (queueRegistry)
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating.");
+ _logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName());
+
+ boolean durable = queueConfiguration.getBoolean("durable" ,false);
+ boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
+ String owner = queueConfiguration.getString("owner", null);
queue = new AMQQueue(queueName,
- Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
- null /* These queues will have no owner */,
- false /* Therefore autodelete makes no sence */, virtualHost);
+ durable,
+ owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */,
+ autodelete /* Therefore autodelete makes no sence */, virtualHost);
if (queue.isDurable())
{
@@ -198,27 +127,69 @@ public class VirtualHostConfiguration
}
else
{
- _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating.");
+ _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating.");
}
- Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName());
- synchronized (defaultExchange)
+ String exchangeName = queueConfiguration.getString("exchange", null);
+
+ Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
+
+ if(exchange == null)
{
- if (defaultExchange == null)
- {
- throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding);
- }
+ exchange = virtualHost.getExchangeRegistry().getDefaultExchange();
+ }
- defaultExchange.registerQueue(queue.getName(), queue, null);
+ if (exchange == null)
+ {
+ throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
+ }
- if (binding.getRoutingKey() == null || binding.getRoutingKey().equals(""))
+ synchronized (exchange)
+ {
+ List routingKeys = queueConfiguration.getList("routingKey");
+ if(routingKeys == null || routingKeys.isEmpty())
{
- throw new ConfigurationException("Unknown binding not specified on url:" + binding);
+ routingKeys = Collections.singletonList(queue.getName());
}
- queue.bind(binding.getRoutingKey(), defaultExchange);
+ for(Object routingKeyNameObj : routingKeys)
+ {
+ AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
+ exchange.registerQueue(routingKey, queue, null);
+
+ queue.bind(routingKey, exchange);
+
+ _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
+ }
}
- _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'");
+
}
+
+
+
+ Configurator.configure(queue, queueConfiguration);
}
+
+
+ public void performBindings() throws AMQException, ConfigurationException
+ {
+ List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name");
+
+ _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
+
+ for(Object nameObject : virtualHostNames)
+ {
+ String name = String.valueOf(nameObject);
+ configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name));
+ }
+
+ if (virtualHostNames == null || virtualHostNames.isEmpty())
+ {
+ throw new ConfigurationException(
+ "Virtualhost Configuration document does not contain a valid virtualhost.");
+ }
+ }
+
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 374772bc4a..cb3b1eafa6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -67,6 +67,11 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
_defaultExchange = exchange;
}
+ public Exchange getDefaultExchange()
+ {
+ return _defaultExchange;
+ }
+
public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
{
// TODO: check inUse argument
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index 24884d20d7..a022b86299 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -40,4 +40,6 @@ public interface ExchangeRegistry extends MessageRouter
Exchange getExchange(AMQShortString name);
void setDefaultExchange(Exchange exchange);
+
+ Exchange getDefaultExchange();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 7a16901796..71fecc45fa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -509,6 +509,12 @@ public class AMQMessage
_messageHandle.setRedelivered(redelivered);
}
+ public long getArrivalTime()
+ {
+ return _messageHandle.getArrivalTime();
+ }
+
+
/**
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
index d788d1b9e2..f07706c2e6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -74,4 +74,6 @@ public interface AMQMessageHandle
void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
+
+ long getArrivalTime();
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index ab7bbefe92..aa372a3b99 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +36,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
-import java.util.ArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@@ -130,22 +130,36 @@ public class AMQQueue implements Managable, Comparable
/**
* max allowed size(KB) of a single message
*/
- private long _maxMessageSize = 10000;
+ private long _maximumMessageSize = 10000;
/**
* max allowed number of messages on a queue.
*/
- private Integer _maxMessageCount = 10000;
+ @Configured(path = "maximumMessageCount", defaultValue = "0")
+ public int _maximumMessageCount;
/**
- * max queue depth(KB) for the queue
+ * max queue depth for the queue
*/
- private long _maxQueueDepth = 10000000;
+ @Configured(path = "maximumQueueDepth", defaultValue = "0")
+ public long _maximumQueueDepth = 10000000;
+
+/*
+ * maximum message age before alerts occur
+ */
+ @Configured(path = "maximumMessageAge", defaultValue = "0")
+ public long _maximumMessageAge = 30000; //0
+
+ /*
+ * the minimum interval between sending out consequetive alerts of the same type
+ */
+ @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
+ public long _minimumAlertRepeatGap = 30000;
/**
* total messages received by the queue since startup.
*/
- private long _totalMessagesReceived = 0;
+ public long _totalMessagesReceived = 0;
public int compareTo(Object o)
{
@@ -286,50 +300,56 @@ public class AMQQueue implements Managable, Comparable
return _managedObject;
}
- public Long getMaximumMessageSize()
+ public long getMaximumMessageSize()
{
- return _maxMessageSize;
+ return _maximumMessageSize;
}
- public void setMaximumMessageSize(Long value)
+ public void setMaximumMessageSize(long value)
{
- _maxMessageSize = value;
+ _maximumMessageSize = value;
}
- public Integer getConsumerCount()
+ public int getConsumerCount()
{
return _subscribers.size();
}
- public Integer getActiveConsumerCount()
+ public int getActiveConsumerCount()
{
return _subscribers.getWeight();
}
- public Long getReceivedMessageCount()
+ public long getReceivedMessageCount()
{
return _totalMessagesReceived;
}
- public Integer getMaximumMessageCount()
+ public int getMaximumMessageCount()
{
- return _maxMessageCount;
+ return _maximumMessageCount;
}
- public void setMaximumMessageCount(Integer value)
+ public void setMaximumMessageCount(int value)
{
- _maxMessageCount = value;
+ _maximumMessageCount = value;
}
public long getMaximumQueueDepth()
{
- return _maxQueueDepth;
+ return _maximumQueueDepth;
}
// Sets the queue depth, the max queue size
public void setMaximumQueueDepth(long value)
{
- _maxQueueDepth = value;
+ _maximumQueueDepth = value;
+ }
+
+ public long getOldestMessageArrivalTime()
+ {
+ return _deliveryMgr.getOldestMessageArrival();
+
}
/**
@@ -631,6 +651,24 @@ public class AMQQueue implements Managable, Comparable
_deleteTaskList.add(task);
}
+ public long getMinimumAlertRepeatGap()
+ {
+ return _minimumAlertRepeatGap;
+ }
+
+ public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+ {
+ _minimumAlertRepeatGap = minimumAlertRepeatGap;
+ }
+ public long getMaximumMessageAge()
+ {
+ return _maximumMessageAge;
+ }
+
+ public void setMaximumMessageAge(long maximumMessageAge)
+ {
+ _maximumMessageAge = maximumMessageAge;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index e15dc648f7..bee2e3950c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -22,12 +22,12 @@ import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.Main;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
import javax.management.openmbean.*;
import javax.management.*;
@@ -41,8 +41,11 @@ import java.util.Iterator;
* for an AMQQueue.
*/
@MBeanDescription("Management Interface for AMQQueue")
-public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+
+ private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
/**
* Since the MBean is not associated with a real channel we can safely create our own store context
* for use in the few methods that require one.
@@ -63,6 +66,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
+
+ private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -213,38 +219,38 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
return msg.getContentHeaderBody().bodySize;
}
+
+
/**
* Checks if there is any notification to be send to the listeners
*/
public void checkForNotification(AMQMessage msg) throws AMQException, JMException
{
- // Check for threshold message count
- Integer msgCount = getMessageCount();
- if (msgCount >= getMaximumMessageCount())
- {
- notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
- }
- // Check for threshold message size
- long messageSize = getMessageSize(msg);
- if (messageSize >= _queue.getMaximumMessageSize())
- {
- notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
- }
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
- // Check for threshold queue depth in bytes
- long queueDepth = getQueueDepthKb();
- if (queueDepth >= _queue.getMaximumQueueDepth())
+ for(NotificationCheck check : NotificationCheck.values())
{
- notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
+ if(check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()]<thresholdTime)
+ {
+ if(check.notifyIfNecessary(msg, _queue, this))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
+ }
}
+
}
/**
* Sends the notification to the listeners
*/
- private void notifyClients(String notificationMsg)
+ public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
+ // important : add log to the log file - monitoring tools may be looking for this
+ _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+
Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 2103d79310..3a9ce64c57 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -160,6 +160,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return _totalMessageSize.get();
}
+ public long getOldestMessageArrival()
+ {
+ AMQMessage msg = _messages.peek();
+ return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+ }
+
public synchronized List<AMQMessage> getMessages()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index 7d7ede0732..f7820e1465 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -83,4 +83,6 @@ interface DeliveryManager
boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;
long getTotalMessageSize();
+
+ long getOldestMessageArrival();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
index 186a5c8753..4f7dc21598 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -43,6 +43,8 @@ public class InMemoryMessageHandle implements AMQMessageHandle
private boolean _redelivered;
+ private long _arrivalTime;
+
public InMemoryMessageHandle()
{
}
@@ -114,6 +116,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle
{
_publishBody = publishBody;
_contentHeaderBody = contentHeaderBody;
+ _arrivalTime = System.currentTimeMillis();
}
public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
@@ -130,4 +133,10 @@ public class InMemoryMessageHandle implements AMQMessageHandle
{
// NO OP
}
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
index deed18c188..a66a85e54d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
@@ -31,11 +31,19 @@ public class MessageMetaData
private int _contentChunkCount;
+ private long _arrivalTime;
+
public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
{
+ this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
+ }
+
+ public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
+ {
_contentHeaderBody = contentHeaderBody;
_publishBody = publishBody;
_contentChunkCount = contentChunkCount;
+ _arrivalTime = arrivalTime;
}
public int getContentChunkCount()
@@ -67,4 +75,14 @@ public class MessageMetaData
{
_publishBody = publishBody;
}
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
+ public void setArrivalTime(long arrivalTime)
+ {
+ _arrivalTime = arrivalTime;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
new file mode 100644
index 0000000000..bc8e1232a7
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public enum NotificationCheck
+{
+
+ MESSAGE_COUNT_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ int msgCount = queue.getMessageCount();
+ final int maximumMessageCount = queue.getMaximumMessageCount();
+ if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ {
+ listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
+ return true;
+ }
+ return false;
+ }
+ },
+ MESSAGE_SIZE_ALERT(true)
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ final long maximumMessageSize = queue.getMaximumMessageSize();
+ if(maximumMessageSize != 0)
+ {
+ // Check for threshold message size
+ long messageSize;
+ try
+ {
+ messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+ }
+ catch (AMQException e)
+ {
+ messageSize = 0;
+ }
+
+
+ if (messageSize >= maximumMessageSize)
+ {
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ QUEUE_DEPTH_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ // Check for threshold queue depth in bytes
+ final long maximumQueueDepth = queue.getMaximumQueueDepth();
+
+ if(maximumQueueDepth != 0)
+ {
+ final long queueDepth = queue.getQueueDepth();
+
+ if (queueDepth >= maximumQueueDepth)
+ {
+ listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ MESSAGE_AGE_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+
+ final long maxMessageAge = queue.getMaximumMessageAge();
+ if(maxMessageAge != 0)
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - maxMessageAge;
+ final long firstArrivalTime = queue.getOldestMessageArrivalTime();
+
+ if(firstArrivalTime < thresholdTime)
+ {
+ long oldestAge = currentTime - firstArrivalTime;
+ listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ }
+ ;
+
+ private final boolean _messageSpecific;
+
+ NotificationCheck()
+ {
+ this(false);
+ }
+
+ NotificationCheck(boolean messageSpecific)
+ {
+ _messageSpecific = messageSpecific;
+ }
+
+ public boolean isMessageSpecific()
+ {
+ return _messageSpecific;
+ }
+
+ abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
new file mode 100644
index 0000000000..9554d34f00
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface QueueNotificationListener
+{
+ void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index 0abf464d15..8675a7249a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -49,6 +49,8 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
private final MessageStore _messageStore;
+ private long _arrivalTime;
+
public WeakReferenceMessageHandle(MessageStore messageStore)
{
@@ -60,14 +62,27 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
if (chb == null)
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ MessageMetaData mmd = loadMessageMetaData(messageId);
chb = mmd.getContentHeaderBody();
- _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
- _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
}
return chb;
}
+ private MessageMetaData loadMessageMetaData(Long messageId)
+ throws AMQException
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ populateFromMessageMetaData(mmd);
+ return mmd;
+ }
+
+ private void populateFromMessageMetaData(MessageMetaData mmd)
+ {
+ _arrivalTime = mmd.getArrivalTime();
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
+ _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+ }
+
public int getBodyCount(Long messageId) throws AMQException
{
if (_contentBodies == null)
@@ -136,10 +151,9 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null);
if (bpb == null)
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ MessageMetaData mmd = loadMessageMetaData(messageId);
+
bpb = mmd.getPublishBody();
- _publishBody = new WeakReference<BasicPublishBody>(bpb);
- _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
}
return bpb;
}
@@ -179,10 +193,15 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
{
_contentBodies = new LinkedList<WeakReference<ContentBody>>();
}
- _messageStore.storeMessageMetaData(storeContext, messageId, new MessageMetaData(publishBody, contentHeaderBody,
- _contentBodies.size()));
- _publishBody = new WeakReference<BasicPublishBody>(publishBody);
- _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
+
+ final long arrivalTime = System.currentTimeMillis();
+
+
+ MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime);
+
+ _messageStore.storeMessageMetaData(storeContext, messageId, mmd);
+
+ populateFromMessageMetaData(mmd);
}
public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
@@ -199,4 +218,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
{
_messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
}
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
}