diff options
author | Aidan Skinner <aidan@apache.org> | 2008-08-21 13:53:28 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-08-21 13:53:28 +0000 |
commit | 4ea404d310f53d8bd9a3342d568108f6f1111a9a (patch) | |
tree | 1753d08fd28fa35df86ea09ed9c8a4dbdb4a8b6a | |
parent | 48c6f80d5cb49715256d1e192c0ab13fcea3d245 (diff) | |
download | qpid-python-4ea404d310f53d8bd9a3342d568108f6f1111a9a.tar.gz |
QPID-1167: reset queue notification lists when creating queues. Pull out defaults centrally.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@687764 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 73 insertions, 25 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 88d5360f3e..fc6057afd2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -183,13 +183,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr _messageStore.createQueue(queue); } - Configuration virtualHostDefaultQueueConfiguration = - VirtualHostConfiguration.getDefaultQueueConfiguration(queue); - if (virtualHostDefaultQueueConfiguration != null) - { - Configurator.configure(queue, virtualHostDefaultQueueConfiguration); - } - _queueRegistry.registerQueue(queue); } catch (AMQException ex) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index bd3e5b1f72..2ee8f54a2b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -130,13 +130,13 @@ public class VirtualHostConfiguration } } - public static CompositeConfiguration getDefaultQueueConfiguration(AMQQueue queue) + public static CompositeConfiguration getDefaultQueueConfiguration(VirtualHost host) { CompositeConfiguration queueConfiguration = null; if (_config == null) return null; - Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + queue.getVirtualHost().getName()); + Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + host.getName()); if (vHostConfiguration == null) return null; @@ -193,7 +193,10 @@ public class VirtualHostConfiguration queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */, - autodelete /* Therefore autodelete makes no sence */, virtualHost, arguments); + autodelete /* Therefore autodelete makes no sence */, + virtualHost, + arguments, + queueConfiguration); if (queue.isDurable()) { @@ -247,10 +250,6 @@ public class VirtualHostConfiguration } } - - - - Configurator.configure(queue, queueConfiguration); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 447482ccf3..3047643021 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -203,13 +203,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } }); }// if exclusive and not durable - - Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue); - if (virtualHostDefaultQueueConfiguration != null) - { - Configurator.configure(queue, virtualHostDefaultQueueConfiguration); - } - + return queue; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index c9c252f06d..90d7109df8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.exchange.Exchange; @@ -207,4 +209,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> { public void doTask(AMQQueue queue) throws AMQException; } + + void configure(Configuration virtualHostDefaultQueueConfiguration); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 9dfc4449bb..19e98f416d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,8 +20,10 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.AMQException; @@ -31,22 +33,43 @@ public class AMQQueueFactory public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); public static AMQQueue createAMQQueueImpl(AMQShortString name, + boolean durable, + AMQShortString owner, + boolean autoDelete, + VirtualHost virtualHost, final FieldTable arguments) + + throws AMQException + { + + return createAMQQueueImpl(name, durable, owner, autoDelete, + virtualHost, arguments, + VirtualHostConfiguration.getDefaultQueueConfiguration(virtualHost)); + } + + public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, - VirtualHost virtualHost, final FieldTable arguments) + VirtualHost virtualHost, final FieldTable arguments, + Configuration queueConfiguration) throws AMQException { final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1; + AMQQueue q = null; if(priorities > 1) { - return new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities); + q = new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities); } else { - return new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost); + q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost); + } + if (q != null && queueConfiguration != null) + { + q.configure(queueConfiguration); } + return q; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b0f700d4a1..29c3f68286 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -3,6 +3,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MessageStore; @@ -14,6 +15,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.configuration.Configured; +import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import javax.management.JMException; @@ -160,12 +162,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new AMQException("AMQQueue MBean creation has failed ", e); } + resetNotifications(); + + } + + private void resetNotifications() + { // This ensure that the notification checks for the configured alerts are created. setMaximumMessageAge(_maximumMessageAge); setMaximumMessageCount(_maximumMessageCount); setMaximumMessageSize(_maximumMessageSize); setMaximumQueueDepth(_maximumQueueDepth); - } // ------ Getters and Setters @@ -1635,4 +1642,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } return ids; } + + public void configure(Configuration queueConfiguration) + { + Configurator.configure(this, queueConfiguration); + resetNotifications(); + } }
\ No newline at end of file diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 5eafc18378..0ada9cefee 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.commons.configuration.CompositeConfiguration; import org.apache.mina.common.ByteBuffer; import javax.management.Notification; @@ -47,6 +48,7 @@ import javax.management.Notification; import java.util.ArrayList; import java.util.LinkedList; import java.util.Collections; +import java.util.Set; /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ public class AMQQueueAlertTest extends TestCase @@ -251,6 +253,26 @@ public class AMQQueueAlertTest extends TestCase assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth())); } + public void testAlertConfiguration() throws AMQException + { + // Setup configuration + CompositeConfiguration config = new CompositeConfiguration(); + config.setProperty("maximumMessageSize", new Long(23)); + config.setProperty("maximumMessageCount", new Long(24)); + config.setProperty("maximumQueueDepth", new Long(25)); + config.setProperty("maximumMessageAge", new Long(26)); + + // Create queue and set config + _queue = getNewQueue(); + _queue.configure(config); + + // Check alerts and notifications + Set<NotificationCheck> checks = _queue.getNotificationChecks(); + assertNotNull("No checks found", checks); + assertFalse("Checks should not be empty", checks.isEmpty()); + assertEquals("Wrong number of checks", 4, checks.size()); + } + protected IncomingMessage message(final boolean immediate, long size) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() |