summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-06-14 12:36:56 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-06-14 12:36:56 +0000
commited9058f6a2584b9e4aa29caed99410ce71dbe814 (patch)
tree4702113e4326e9653a513b3f9be03d0fe3b2b8c8
parent16a79007527c096c7da40aa8cd0645279765227a (diff)
downloadqpid-python-ed9058f6a2584b9e4aa29caed99410ce71dbe814.tar.gz
QPID-2638 : Add initial support for Topics section in configuration file.
Added getQueueConfiguration(AMQQueue) which will return a new configuration for the given queue reflecting its binding status. This will allow the queue to be reconfigured during the binding process. Full Docs on this approach to appear on wiki. AMQQueue.configure and getConfiguration() have been updated to use ConfigurationPlugin rather than QueueConfiguration, The queue may be configured by a TopicConfiguration now. Update SlowConsumerTest to be GlobalQueuesTest and add a GlobalTopicsTest to match, where the config is added to the queues or topics section respectively git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@954433 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java2
-rw-r--r--qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java2
-rw-r--r--qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java4
-rw-r--r--qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java2
-rw-r--r--qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java (renamed from qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java)115
-rw-r--r--qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java36
-rw-r--r--qpid/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java2
-rw-r--r--qpid/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java120
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java76
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java26
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java7
21 files changed, 335 insertions, 98 deletions
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java
index 59fbaa4a34..e6e0059902 100644
--- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java
+++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java
@@ -60,7 +60,7 @@ public class AccessControl extends AbstractPlugin
public AccessControl newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- AccessControlConfiguration configuration = config.getConfiguration(AccessControlConfiguration.class);
+ AccessControlConfiguration configuration = config.getConfiguration(AccessControlConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
index c094c7eb6d..0949204a33 100644
--- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
+++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
@@ -95,7 +95,7 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
"('messageAge','depth' or 'messageCount') must be specified.");
}
- SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class);
+ SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName());
PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
index 77819f8dbf..cac52c2fdf 100644
--- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
+++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
@@ -40,7 +40,7 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
public SlowConsumerDetection newInstance(VirtualHost vhost)
{
- SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class);
+ SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName());
if (config == null)
{
@@ -74,7 +74,7 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
try
{
SlowConsumerDetectionQueueConfiguration config =
- q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class);
+ q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
if (checkQueueStatus(q, config))
{
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
index 3c67f6e6d7..c5275aa0bf 100644
--- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
+++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
@@ -45,7 +45,7 @@ public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException
{
TopicDeletePolicyConfiguration config =
- configuration.getConfiguration(TopicDeletePolicyConfiguration.class);
+ configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName());
TopicDeletePolicy policy = new TopicDeletePolicy();
policy.configure(config);
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java
index f0be3d2db0..513bafa8ad 100644
--- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
+++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java
@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
* Slow consumers should on a topic should expect to receive a
* 506 : Resource Error if the hit a predefined threshold.
*/
-public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
+public class GlobalQueuesTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
{
Topic _destination;
private CountDownLatch _disconnectionLatch = new CountDownLatch(1);
@@ -59,6 +59,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
private Exception _publisherError = null;
private JMSException _connectionException = null;
private static final long JOIN_WAIT = 5000;
+ protected String CONFIG_SECTION = ".queues";
@Override
public void setUp() throws IOException, ConfigurationException, NamingException
@@ -71,32 +72,26 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
+ getConnectionURL().getVirtualHost().substring(1) +
".slow-consumer-detection.timeunit", "SECONDS");
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "policy.name", "TopicDelete");
-
-
/**
* Queue Configuration
<slow-consumer-detection>
- <!-- The depth before which the policy will be applied-->
- <depth>4235264</depth>
-
- <!-- The message age before which the policy will be applied-->
- <messageAge>600000</messageAge>
-
- <!-- The number of message before which the policy will be applied-->
- <messageCount>50</messageCount>
-
- <!-- Policies configuration -->
- <policy>
- <name>TopicDelete</name>
- <topicDelete>
- <delete-persistent/>
- </topicDelete>
- </policy>
+ <!-- The depth before which the policy will be applied-->
+ <depth>4235264</depth>
+
+ <!-- The message age before which the policy will be applied-->
+ <messageAge>600000</messageAge>
+
+ <!-- The number of message before which the policy will be applied-->
+ <messageCount>50</messageCount>
+
+ <!-- Policies configuration -->
+ <policy>
+ <name>TopicDelete</name>
+ <topicDelete>
+ <delete-persistent/>
+ </topicDelete>
+ </policy>
</slow-consumer-detection>
*/
@@ -105,8 +100,8 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
* VirtualHost Plugin Configuration
<slow-consumer-detection>
- <delay>1</delay>
- <timeunit>MINUTES</timeunit>
+ <delay>1</delay>
+ <timeunit>MINUTES</timeunit>
</slow-consumer-detection>
*/
@@ -132,6 +127,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
* Clients should not have to modify their code based on the protocol in use.
*
* @param ackMode @see javax.jms.Session
+ *
* @throws Exception
*/
public void topicConsumer(int ackMode, boolean durable) throws Exception
@@ -260,6 +256,27 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
}
+ public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException
+ {
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ CONFIG_SECTION + ".slow-consumer-detection." +
+ "policy.name", "TopicDelete");
+
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ CONFIG_SECTION + ".slow-consumer-detection." +
+ property, value);
+
+ if (deleteDurable)
+ {
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ CONFIG_SECTION + ".slow-consumer-detection." +
+ "policy.topicdelete.delete-persistent", "");
+ }
+ }
+
/**
* Test that setting messageCount takes affect on topics
*
@@ -271,10 +288,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1));
+ setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false);
//Start the broker
super.setUp();
@@ -295,10 +309,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "depth", String.valueOf(MESSAGE_SIZE * 9));
+ setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false);
//Start the broker
super.setUp();
@@ -321,10 +332,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "messageAge", String.valueOf(DISCONNECTION_WAIT / 2));
+ setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false);
//Start the broker
super.setUp();
@@ -346,15 +354,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1));
-
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "policy.topicdelete.delete-persistent", "");
+ setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true);
//Start the broker
super.setUp();
@@ -377,15 +377,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "depth", String.valueOf(MESSAGE_SIZE * 9));
-
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "policy.topicdelete.delete-persistent", "");
+ setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true);
//Start the broker
super.setUp();
@@ -395,7 +387,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
}
- /**
+ /**
* Test that setting messageAge has an effect on topics
*
* Ensure we set the delete-persistent option
@@ -410,15 +402,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
{
MAX_QUEUE_MESSAGE_COUNT = 10;
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "messageAge", String.valueOf(DISCONNECTION_WAIT / 5));
-
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".queues.slow-consumer-detection." +
- "policy.topicdelete.delete-persistent", "");
+ setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true);
//Start the broker
super.setUp();
@@ -438,6 +422,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
_disconnectionLatch.countDown();
}
+
/// Connection Listener
public void bytesSent(long count)
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java
new file mode 100644
index 0000000000..1f8103fa3c
--- /dev/null
+++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import javax.naming.NamingException;
+import java.io.IOException;
+
+public class GlobalTopicsTest extends GlobalQueuesTest
+{
+ @Override
+ public void setUp() throws NamingException, IOException, ConfigurationException
+ {
+ CONFIG_SECTION = ".topics";
+ super.setUp();
+ }
+}
diff --git a/qpid/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java b/qpid/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java
index 6fe0d03741..ae2baa95ca 100644
--- a/qpid/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java
+++ b/qpid/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java
@@ -44,7 +44,7 @@ public class Firewall extends AbstractPlugin
{
public Firewall newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- FirewallConfiguration configuration = config.getConfiguration(FirewallConfiguration.class);
+ FirewallConfiguration configuration = config.getConfiguration(FirewallConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/qpid/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java b/qpid/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
index 1bf8761978..c9a476c5f2 100644
--- a/qpid/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
+++ b/qpid/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
@@ -52,7 +52,7 @@ public class SimpleXML extends AbstractPlugin
{
public SimpleXML newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- SimpleXMLConfiguration configuration = config.getConfiguration(SimpleXMLConfiguration.class);
+ SimpleXMLConfiguration configuration = config.getConfiguration(SimpleXMLConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
index 50377eaf52..b24a326ed3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
@@ -32,6 +32,8 @@ import org.apache.qpid.server.configuration.BindingConfig;
import org.apache.qpid.server.configuration.BindingConfigType;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -201,6 +203,15 @@ public class BindingFactory
exchange.addBinding(b);
getConfigStore().addConfiguredObject(b);
b.logCreation();
+
+ //Reconfigure the queue for to reflect this new binding.
+ ConfigurationPlugin config = queue.getVirtualHost().getConfiguration().getQueueConfiguration(queue);
+
+ if (config != null)
+ {
+ // Reconfigure with new config.
+ queue.configure(config);
+ }
return true;
}
else
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java
new file mode 100644
index 0000000000..0218bf7273
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class TopicConfiguration extends ConfigurationPlugin
+{
+ public static final ConfigurationPluginFactory FACTORY = new TopicConfigurationFactory();
+
+ private static final String VIRTUALHOSTS_VIRTUALHOST_TOPICS = "virtualhosts.virtualhost.topics";
+
+ public static class TopicConfigurationFactory implements ConfigurationPluginFactory
+ {
+
+ public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
+ {
+ TopicConfiguration topicsConfig = new TopicConfiguration();
+ topicsConfig.setConfiguration(path, config);
+ return topicsConfig;
+ }
+
+ public List<String> getParentPaths()
+ {
+ return Arrays.asList(VIRTUALHOSTS_VIRTUALHOST_TOPICS);
+ }
+ }
+
+ Map<String, TopicConfig> _topics = new HashMap<String, TopicConfig>();
+
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"topic"};
+ }
+
+ @Override
+ public void validateConfiguration() throws ConfigurationException
+ {
+ if (_configuration.isEmpty())
+ {
+ throw new ConfigurationException("Topics section cannot be empty.");
+ }
+
+ int topics = _configuration.getList("topic.name").size();
+
+ for(int index=0; index<topics;index++)
+ {
+ TopicConfig topic = new TopicConfig();
+ topic.setConfiguration(VIRTUALHOSTS_VIRTUALHOST_TOPICS + ".topic", _configuration.subset("topic(" + index + ")"));
+
+ String topicName = _configuration.getString("topic(" + index + ").name");
+ if(_topics.containsKey(topicName))
+ {
+ throw new ConfigurationException("Topics section cannot contain two entries for the same topic.");
+ }
+ else
+ {
+ _topics.put(topicName, topic);
+ }
+ }
+ }
+
+ public String toString()
+ {
+ return getClass().getName() + ": Defined Topics:" + _topics.size();
+ }
+
+ public static class TopicConfig extends ConfigurationPlugin
+ {
+ @Override
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"name"};
+ }
+
+ public String getName()
+ {
+ // If we don't specify a topic name then match all topics
+ String configName = getStringValue("name");
+ return configName == null ? "#" : configName;
+ }
+
+
+ public void validateConfiguration() throws ConfigurationException
+ {
+ if(_configuration.isEmpty())
+ {
+ throw new ConfigurationException("Topic section cannot be empty.");
+ }
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 2be3311403..7dab02aee7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.configuration;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -29,7 +31,15 @@ import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeType;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
@@ -147,6 +157,72 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
}
}
+ public ConfigurationPlugin getQueueConfiguration(AMQQueue queue)
+ {
+ VirtualHostConfiguration hostConfig = queue.getVirtualHost().getConfiguration();
+
+ // First check if we have a named queue configuration (the easy case)
+ if (Arrays.asList(hostConfig.getQueueNames()).contains(queue.getName()))
+ {
+ return null;
+ }
+
+ // We don't have an explicit queue config we must find out what we need.
+ ArrayList<Binding> bindings = new ArrayList<Binding>(queue.getBindings());
+
+ List<AMQShortString> exchangeClasses = new ArrayList<AMQShortString>(bindings.size());
+
+ //Remove default exchange
+ for (int index = 0; index < bindings.size(); index++)
+ {
+ // Ignore the DEFAULT Exchange binding
+ if (bindings.get(index).getExchange().getNameShortString().equals(ExchangeDefaults.DEFAULT_EXCHANGE_NAME))
+ {
+ bindings.remove(index);
+ }
+ else
+ {
+ exchangeClasses.add(bindings.get(index).getExchange().getType().getName());
+
+ if (exchangeClasses.size() > 1)
+ {
+ // If we have more than 1 class of exchange then we can only use the global queue configuration.
+ // and this will be returned from the default getQueueConfiguration
+ return null;
+ }
+ }
+ }
+
+ // If we are just bound the the default exchange then use the default.
+ if (bindings.isEmpty())
+ {
+ return null;
+ }
+
+ // If we are bound to only one type of exchange then we are going
+ // to have to resolve the configuration for that exchange.
+
+ String exchangeName = bindings.get(0).getExchange().getType().getName().toString();
+
+ // Lookup a Configuration handler for this Exchange.
+
+ // Build the expected class name. <Exchangename>sConfiguration
+ // i.e. TopicConfiguration or HeadersConfiguration
+ String exchangeClass = "org.apache.qpid.server.configuration."
+ + exchangeName.substring(0, 1).toUpperCase()
+ + exchangeName.substring(1) + "Configuration";
+
+ ConfigurationPlugin configPlugin
+ = queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass);
+
+
+ // now need to perform the queue-topic-topics-queue magic.
+
+ System.err.println("*********** Reconfiguring queue with config:"+configPlugin);
+
+ return configPlugin;
+ }
+
public long getMemoryUsageMaximum()
{
return getLongValue("queues.maximumMemoryUsage");
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
index 9024c6aec6..1da1459f70 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
@@ -38,8 +38,8 @@ public abstract class ConfigurationPlugin
{
protected static final Logger _logger = Logger.getLogger(ConfigurationPlugin.class);
- private Map<Class<? extends ConfigurationPlugin>, ConfigurationPlugin>
- _pluginConfiguration = new HashMap<Class<? extends ConfigurationPlugin>, ConfigurationPlugin>();
+ private Map<String, ConfigurationPlugin>
+ _pluginConfiguration = new HashMap<String, ConfigurationPlugin>();
protected Configuration _configuration;
@@ -69,7 +69,7 @@ public abstract class ConfigurationPlugin
return _configuration;
}
- public <C extends ConfigurationPlugin> C getConfiguration(Class<C> plugin)
+ public <C extends ConfigurationPlugin> C getConfiguration(String plugin)
{
return (C) _pluginConfiguration.get(plugin);
}
@@ -155,7 +155,7 @@ public abstract class ConfigurationPlugin
List<ConfigurationPlugin> handlers = configurationManager.getConfigurationPlugins(configurationElement, handled);
for (ConfigurationPlugin plugin : handlers)
{
- _pluginConfiguration.put(plugin.getClass(), plugin);
+ _pluginConfiguration.put(plugin.getClass().getName(), plugin);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
index 466bc9e228..8364635632 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
@@ -34,6 +34,8 @@ import org.apache.felix.framework.Felix;
import org.apache.felix.framework.util.StringMap;
import org.apache.log4j.Logger;
import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.configuration.TopicConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.security.SecurityManager;
@@ -79,6 +81,7 @@ public class PluginManager implements Closeable
_securityPlugins.put(pluginFactory.getPluginName(), pluginFactory);
}
for (ConfigurationPluginFactory configFactory : Arrays.asList(
+ TopicConfiguration.FACTORY,
SecurityManager.SecurityConfiguration.FACTORY,
AllowAll.AllowAllConfiguration.FACTORY,
DenyAll.DenyAllConfiguration.FACTORY,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 3b60734a2e..225fbec930 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -274,9 +275,9 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
public void doTask(AMQQueue queue) throws AMQException;
}
- void configure(QueueConfiguration config);
+ void configure(ConfigurationPlugin config);
- QueueConfiguration getConfiguration();
+ ConfigurationPlugin getConfiguration();
ManagedObject getManagedObject();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index f81a4a6911..451d59b2e9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -185,7 +186,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
- private QueueConfiguration _queueConfiguration;
+ private ConfigurationPlugin _queueConfiguration;
@@ -2065,24 +2066,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- public void configure(QueueConfiguration config)
+ public void configure(ConfigurationPlugin config)
{
if (config != null)
{
- setMaximumMessageAge(config.getMaximumMessageAge());
- setMaximumQueueDepth(config.getMaximumQueueDepth());
- setMaximumMessageSize(config.getMaximumMessageSize());
- setMaximumMessageCount(config.getMaximumMessageCount());
- setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
- _capacity = config.getCapacity();
- _flowResumeCapacity = config.getFlowResumeCapacity();
+ if (config instanceof QueueConfiguration)
+ {
+
+ setMaximumMessageAge(((QueueConfiguration)config).getMaximumMessageAge());
+ setMaximumQueueDepth(((QueueConfiguration)config).getMaximumQueueDepth());
+ setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize());
+ setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount());
+ setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap());
+ _capacity = ((QueueConfiguration)config).getCapacity();
+ _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity();
+ }
_queueConfiguration = config;
+
}
}
- public QueueConfiguration getConfiguration()
+ public ConfigurationPlugin getConfiguration()
{
return _queueConfiguration;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index 362d919a5e..ff28d76053 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -153,7 +153,7 @@ public class SecurityManager
public Map<String, SecurityPlugin> configurePlugins(ConfigurationPlugin hostConfig) throws ConfigurationException
{
Map<String, SecurityPlugin> plugins = new HashMap<String, SecurityPlugin>();
- SecurityConfiguration securityConfig = hostConfig.getConfiguration(SecurityConfiguration.class);
+ SecurityConfiguration securityConfig = hostConfig.getConfiguration(SecurityConfiguration.class.getName());
// If we have no security Configuration then there is nothing to configure.
if (securityConfig != null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
index d4777b8cb3..db18a89231 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
@@ -66,7 +66,7 @@ public class AllowAll extends BasicPlugin
{
public AllowAll newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- AllowAllConfiguration configuration = config.getConfiguration(AllowAllConfiguration.class);
+ AllowAllConfiguration configuration = config.getConfiguration(AllowAllConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
index cd68511730..6c0fb1eaa4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
@@ -66,7 +66,7 @@ public class DenyAll extends BasicPlugin
{
public DenyAll newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- DenyAllConfiguration configuration = config.getConfiguration(DenyAllConfiguration.class);
+ DenyAllConfiguration configuration = config.getConfiguration(DenyAllConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java
index 1250cdcb1b..bd99cdd1fa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java
@@ -60,7 +60,7 @@ public class LegacyAccess extends BasicPlugin
{
public LegacyAccess newInstance(ConfigurationPlugin config) throws ConfigurationException
{
- LegacyAccessConfiguration configuration = config.getConfiguration(LegacyAccessConfiguration.class);
+ LegacyAccessConfiguration configuration = config.getConfiguration(LegacyAccessConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java
index 7a8cabf512..956bb6f8fa 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java
@@ -38,7 +38,7 @@ public class LogMessageTest extends TestCase
{
Locale usLocal = Locale.US;
Locale.setDefault(usLocal);
- ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.LogMessages",
+ ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Broker_logmessages",
usLocal);
assertNotNull("Unable to load ResourceBundle", _messages);
@@ -55,7 +55,7 @@ public class LogMessageTest extends TestCase
Locale.setDefault(japanese);
try
{
- ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.LogMessages",
+ ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Broker_logmessages",
japanese);
assertNotNull("Unable to load ResourceBundle", _messages);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 70d146437f..51b049787c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.subscription.Subscription;
@@ -36,7 +36,6 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.AMQException;
-import javax.swing.*;
import java.util.List;
import java.util.Set;
import java.util.Map;
@@ -520,12 +519,12 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
- public void configure(QueueConfiguration config)
+ public void configure(ConfigurationPlugin config)
{
}
- public QueueConfiguration getConfiguration()
+ public ConfigurationPlugin getConfiguration()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}