summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-08-21 13:53:28 +0000
committerAidan Skinner <aidan@apache.org>2008-08-21 13:53:28 +0000
commit4ea404d310f53d8bd9a3342d568108f6f1111a9a (patch)
tree1753d08fd28fa35df86ea09ed9c8a4dbdb4a8b6a
parent48c6f80d5cb49715256d1e192c0ab13fcea3d245 (diff)
downloadqpid-python-4ea404d310f53d8bd9a3342d568108f6f1111a9a.tar.gz
QPID-1167: reset queue notification lists when creating queues. Pull out defaults centrally.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@687764 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java22
7 files changed, 73 insertions, 25 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 88d5360f3e..fc6057afd2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -183,13 +183,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
_messageStore.createQueue(queue);
}
- Configuration virtualHostDefaultQueueConfiguration =
- VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
- if (virtualHostDefaultQueueConfiguration != null)
- {
- Configurator.configure(queue, virtualHostDefaultQueueConfiguration);
- }
-
_queueRegistry.registerQueue(queue);
}
catch (AMQException ex)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index bd3e5b1f72..2ee8f54a2b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -130,13 +130,13 @@ public class VirtualHostConfiguration
}
}
- public static CompositeConfiguration getDefaultQueueConfiguration(AMQQueue queue)
+ public static CompositeConfiguration getDefaultQueueConfiguration(VirtualHost host)
{
CompositeConfiguration queueConfiguration = null;
if (_config == null)
return null;
- Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + queue.getVirtualHost().getName());
+ Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + host.getName());
if (vHostConfiguration == null)
return null;
@@ -193,7 +193,10 @@ public class VirtualHostConfiguration
queue = AMQQueueFactory.createAMQQueueImpl(queueName,
durable,
owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */,
- autodelete /* Therefore autodelete makes no sence */, virtualHost, arguments);
+ autodelete /* Therefore autodelete makes no sence */,
+ virtualHost,
+ arguments,
+ queueConfiguration);
if (queue.isDurable())
{
@@ -247,10 +250,6 @@ public class VirtualHostConfiguration
}
}
-
-
-
- Configurator.configure(queue, queueConfiguration);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 447482ccf3..3047643021 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -203,13 +203,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
});
}// if exclusive and not durable
-
- Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
- if (virtualHostDefaultQueueConfiguration != null)
- {
- Configurator.configure(queue, virtualHostDefaultQueueConfiguration);
- }
-
+
return queue;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index c9c252f06d..90d7109df8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.exchange.Exchange;
@@ -207,4 +209,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
{
public void doTask(AMQQueue queue) throws AMQException;
}
+
+ void configure(Configuration virtualHostDefaultQueueConfiguration);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 9dfc4449bb..19e98f416d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
@@ -31,22 +33,43 @@ public class AMQQueueFactory
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
public static AMQQueue createAMQQueueImpl(AMQShortString name,
+ boolean durable,
+ AMQShortString owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, final FieldTable arguments)
+
+ throws AMQException
+ {
+
+ return createAMQQueueImpl(name, durable, owner, autoDelete,
+ virtualHost, arguments,
+ VirtualHostConfiguration.getDefaultQueueConfiguration(virtualHost));
+ }
+
+ public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
boolean autoDelete,
- VirtualHost virtualHost, final FieldTable arguments)
+ VirtualHost virtualHost, final FieldTable arguments,
+ Configuration queueConfiguration)
throws AMQException
{
final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
+ AMQQueue q = null;
if(priorities > 1)
{
- return new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities);
+ q = new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities);
}
else
{
- return new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+ q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+ }
+ if (q != null && queueConfiguration != null)
+ {
+ q.configure(queueConfiguration);
}
+ return q;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b0f700d4a1..29c3f68286 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -3,6 +3,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MessageStore;
@@ -14,6 +15,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.configuration.Configured;
+import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import javax.management.JMException;
@@ -160,12 +162,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new AMQException("AMQQueue MBean creation has failed ", e);
}
+ resetNotifications();
+
+ }
+
+ private void resetNotifications()
+ {
// This ensure that the notification checks for the configured alerts are created.
setMaximumMessageAge(_maximumMessageAge);
setMaximumMessageCount(_maximumMessageCount);
setMaximumMessageSize(_maximumMessageSize);
setMaximumQueueDepth(_maximumQueueDepth);
-
}
// ------ Getters and Setters
@@ -1635,4 +1642,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
return ids;
}
+
+ public void configure(Configuration queueConfiguration)
+ {
+ Configurator.configure(this, queueConfiguration);
+ resetNotifications();
+ }
} \ No newline at end of file
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 5eafc18378..0ada9cefee 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.mina.common.ByteBuffer;
import javax.management.Notification;
@@ -47,6 +48,7 @@ import javax.management.Notification;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
+import java.util.Set;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -251,6 +253,26 @@ public class AMQQueueAlertTest extends TestCase
assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
}
+ public void testAlertConfiguration() throws AMQException
+ {
+ // Setup configuration
+ CompositeConfiguration config = new CompositeConfiguration();
+ config.setProperty("maximumMessageSize", new Long(23));
+ config.setProperty("maximumMessageCount", new Long(24));
+ config.setProperty("maximumQueueDepth", new Long(25));
+ config.setProperty("maximumMessageAge", new Long(26));
+
+ // Create queue and set config
+ _queue = getNewQueue();
+ _queue.configure(config);
+
+ // Check alerts and notifications
+ Set<NotificationCheck> checks = _queue.getNotificationChecks();
+ assertNotNull("No checks found", checks);
+ assertFalse("Checks should not be empty", checks.isEmpty());
+ assertEquals("Wrong number of checks", 4, checks.size());
+ }
+
protected IncomingMessage message(final boolean immediate, long size) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()