summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-13 11:56:49 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-13 11:56:49 +0000
commit78ac305ea8a8d47467d32a80518b8d830e5116af (patch)
tree38459c4e2c939520daa0eb11775f21ed98511ab3
parentb8ee522006dc367c8c1e481cdf5cf2874dd74d88 (diff)
downloadqpid-python-78ac305ea8a8d47467d32a80518b8d830e5116af.tar.gz
QPID-430: Fix message age alerting so that it works on queues which are otherwise inactive.
AMQQueue, VirtualHost, MockAMQQueue: change name of removeExpiredIfNoSubscribers to checkMessageStatus. AMQQueueMBean: remove unthrown exception SimpleAMQQueue: add notification checks to checkMessageStatus, remove catch for JMException which checkForNotification no longer throws. NullApplicationRegistry: set small housekeeping check period so that it runs freuqently and tests don't need to sleep for excessive periods of time AMQQueueAlertTest: remove subsequent send, notification alerts shouldn't depend on queue activity. merged from trunk r743357 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-fix@764421 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java3
7 files changed, 19 insertions, 23 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 03ccbe7ce4..f6d406b653 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -149,9 +149,11 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
long clearQueue(StoreContext storeContext) throws AMQException;
-
-
- void removeExpiredIfNoSubscribers() throws AMQException;
+ /**
+ * Checks the status of messages on the queue, purging expired ones, firing age related alerts etc.
+ * @throws AMQException
+ */
+ void checkMessageStatus() throws AMQException;
Set<NotificationCheck> getNotificationChecks();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 2ed6be77c6..27fed58eb2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -246,7 +246,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
/**
* Checks if there is any notification to be send to the listeners
*/
- public void checkForNotification(AMQMessage msg) throws AMQException, JMException
+ public void checkForNotification(AMQMessage msg) throws AMQException
{
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 7e7e8b2114..547df7856d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -423,17 +423,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
deliverAsync();
}
- try
- {
- _managedObject.checkForNotification(entry.getMessage());
- }
- catch (JMException e)
- {
- throw new AMQException("Unable to get notification from manage queue: " + e, e);
- }
-
+ _managedObject.checkForNotification(entry.getMessage());
+
return entry;
-
}
private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -1431,7 +1423,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- public void removeExpiredIfNoSubscribers() throws AMQException
+ @Override
+ public void checkMessageStatus() throws AMQException
{
final StoreContext storeContext = new StoreContext();
@@ -1443,10 +1436,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.expired() && node.acquire())
{
-
node.discard(storeContext);
+ }
+ else
+ {
+ _managedObject.checkForNotification(node.getMessage());
}
-
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 83b18e7a47..88ad87b9c1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -49,7 +49,8 @@ public class NullApplicationRegistry extends ApplicationRegistry
_logger.info("Initialising NullApplicationRegistry");
_configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
-
+ _configuration.addProperty("housekeeping.expiredMessageCheckPeriod", "200");
+
Properties users = new Properties();
users.put("guest", "guest");
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 1a0d0ce8de..de4c8ac1ff 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -208,7 +208,7 @@ public class VirtualHost implements Accessable
try
{
- q.removeExpiredIfNoSubscribers();
+ q.checkMessageStatus();
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 0ada9cefee..c16fde0134 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -167,9 +167,6 @@ public class AMQQueueAlertTest extends TestCase
// Ensure message sits on queue long enough to age.
Thread.sleep(MAX_MESSAGE_AGE * 2);
- sendMessages(1, MAX_MESSAGE_SIZE);
- assertTrue(_queueMBean.getMessageCount() == 2);
-
Notification lastNotification = _queueMBean.getLastNotification();
assertNotNull(lastNotification);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 3fc26a6f08..758c8ddb2e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -277,7 +277,8 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- public void removeExpiredIfNoSubscribers() throws AMQException
+ @Override
+ public void checkMessageStatus() throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
}