diff options
author | Martin Ritchie <ritchiem@apache.org> | 2009-04-13 11:56:49 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-13 11:56:49 +0000 |
commit | 78ac305ea8a8d47467d32a80518b8d830e5116af (patch) | |
tree | 38459c4e2c939520daa0eb11775f21ed98511ab3 /qpid/java | |
parent | b8ee522006dc367c8c1e481cdf5cf2874dd74d88 (diff) | |
download | qpid-python-78ac305ea8a8d47467d32a80518b8d830e5116af.tar.gz |
QPID-430: Fix message age alerting so that it works on queues which are otherwise inactive.
AMQQueue, VirtualHost, MockAMQQueue: change name of removeExpiredIfNoSubscribers to checkMessageStatus.
AMQQueueMBean: remove unthrown exception
SimpleAMQQueue: add notification checks to checkMessageStatus, remove catch for JMException which checkForNotification no longer throws.
NullApplicationRegistry: set small housekeeping check period so that it runs freuqently and tests don't need to sleep for excessive periods of time
AMQQueueAlertTest: remove subsequent send, notification alerts shouldn't depend on queue activity.
merged from trunk r743357
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-fix@764421 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
7 files changed, 19 insertions, 23 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 03ccbe7ce4..f6d406b653 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -149,9 +149,11 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> long clearQueue(StoreContext storeContext) throws AMQException; - - - void removeExpiredIfNoSubscribers() throws AMQException; + /** + * Checks the status of messages on the queue, purging expired ones, firing age related alerts etc. + * @throws AMQException + */ + void checkMessageStatus() throws AMQException; Set<NotificationCheck> getNotificationChecks(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 2ed6be77c6..27fed58eb2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -246,7 +246,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que /** * Checks if there is any notification to be send to the listeners */ - public void checkForNotification(AMQMessage msg) throws AMQException, JMException + public void checkForNotification(AMQMessage msg) throws AMQException { final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 7e7e8b2114..547df7856d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -423,17 +423,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliverAsync(); } - try - { - _managedObject.checkForNotification(entry.getMessage()); - } - catch (JMException e) - { - throw new AMQException("Unable to get notification from manage queue: " + e, e); - } - + _managedObject.checkForNotification(entry.getMessage()); + return entry; - } private void deliverToSubscription(final Subscription sub, final QueueEntry entry) @@ -1431,7 +1423,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public void removeExpiredIfNoSubscribers() throws AMQException + @Override + public void checkMessageStatus() throws AMQException { final StoreContext storeContext = new StoreContext(); @@ -1443,10 +1436,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.expired() && node.acquire()) { - node.discard(storeContext); + } + else + { + _managedObject.checkForNotification(node.getMessage()); } - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 83b18e7a47..88ad87b9c1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -49,7 +49,8 @@ public class NullApplicationRegistry extends ApplicationRegistry _logger.info("Initialising NullApplicationRegistry"); _configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore"); - + _configuration.addProperty("housekeeping.expiredMessageCheckPeriod", "200"); + Properties users = new Properties(); users.put("guest", "guest"); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 1a0d0ce8de..de4c8ac1ff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -208,7 +208,7 @@ public class VirtualHost implements Accessable try { - q.removeExpiredIfNoSubscribers(); + q.checkMessageStatus(); } catch (AMQException e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 0ada9cefee..c16fde0134 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -167,9 +167,6 @@ public class AMQQueueAlertTest extends TestCase // Ensure message sits on queue long enough to age. Thread.sleep(MAX_MESSAGE_AGE * 2); - sendMessages(1, MAX_MESSAGE_SIZE); - assertTrue(_queueMBean.getMessageCount() == 2); - Notification lastNotification = _queueMBean.getLastNotification(); assertNotNull(lastNotification); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 3fc26a6f08..758c8ddb2e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -277,7 +277,8 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - public void removeExpiredIfNoSubscribers() throws AMQException + @Override + public void checkMessageStatus() throws AMQException { //To change body of implemented methods use File | Settings | File Templates. } |