diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-12-07 12:26:21 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-12-07 12:26:21 +0000 |
| commit | c0b589bcc77638e6c415f5a7d2fe682774b862cb (patch) | |
| tree | dfbb651703c7c8639a774bc8ad3da199e217f5c0 | |
| parent | c363a8a2300a7230171a322dfed9d30c68c9c32f (diff) | |
| download | qpid-python-c0b589bcc77638e6c415f5a7d2fe682774b862cb.tar.gz | |
QPID-2974: add configuration for enabling/disabling DLQ functionality
Applied patch from Andrew Kennedy.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1043003 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 251 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index b6ea9942c5..ed8bd2d4e3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -127,5 +127,13 @@ public class QueueConfiguration { return _config.getString("lvqKey", null); } + + /** + * Check if dead letter queue delivery is enabled, deferring to the virtualhost configuration if not set. + */ + public boolean isDeadLetterQueueEnabled() + { + return _config.getBoolean("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled()); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index a87bef0990..b464919818 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -712,4 +712,12 @@ public class ServerConfiguration implements SignalHandler { return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); } + + /** + * Check if dead letter queue delivery is enabled, defaults to disabled if not set. + */ + public boolean isDeadLetterQueueEnabled() + { + return getConfig().getBoolean("deadLetterQueues", false); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index ed4ccded89..2affd2c64d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -195,4 +195,12 @@ public class VirtualHostConfiguration { return _config.getLong("transactionTimeout.idleClose", 0L); } + + /** + * Check if dead letter queue delivery is enabled, deferring to the broker configuration if not set. + */ + public boolean isDeadLetterQueueEnabled() + { + return _config.getBoolean("queues.deadLetterQueues", ApplicationRegistry.getInstance().getConfiguration().isDeadLetterQueueEnabled()); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 5f5c6fe1a4..786e997e8e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class AMQQueueFactory { - public static final boolean CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED = true;//TODO: take from queue configuration public static final AMQShortString DLQ_ROUTING_KEY = new AMQShortString("dlq"); public static final AMQShortString X_QPID_DLQ_ENABLED = new AMQShortString("x-qpid-dlq-enabled"); public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; @@ -185,7 +184,7 @@ public class AMQQueueFactory boolean dlqArgPresent = (arguments != null && (arguments.containsKey(X_QPID_DLQ_ENABLED))); - if(dlqArgPresent || CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED) + if(dlqArgPresent || qConfig.isDeadLetterQueueEnabled()) { //verify that the argument isn't explicitly disabling DLQ for this queue. boolean dlqEnabled = true; @@ -286,7 +285,7 @@ public class AMQQueueFactory arguments.setString(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey()); } - if (!config.getAutoDelete() && CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED) + if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) { if(arguments == null) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java index 9692cf2727..1447ef6da1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java @@ -43,6 +43,7 @@ public class QueueConfigurationTest extends TestCase fullEnv.setProperty("queues.maximumMessageSize", 1); fullEnv.setProperty("queues.maximumMessageCount", 1); fullEnv.setProperty("queues.minimumAlertRepeatGap", 1); + fullEnv.setProperty("queues.deadLetterQueues", true); _fullHostConf = new VirtualHostConfiguration("test", fullEnv); @@ -133,4 +134,24 @@ public class QueueConfigurationTest extends TestCase assertEquals(1, qConf.getMinimumAlertRepeatGap()); } + /** + * Tests that the default setting for DLQ configuration is disabled, and verifies that it can be overridden + * at a broker or virtualhost level. + */ + public void testIsDeadLetterQueueEnabled() + { + // Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _env, _emptyConf); + assertFalse(qConf.isDeadLetterQueueEnabled()); + + // Check explicit value + PropertiesConfiguration fullEnv = new PropertiesConfiguration(); + fullEnv.setProperty("deadLetterQueues", true); + qConf = new QueueConfiguration("test", fullEnv, _fullHostConf); + assertTrue(qConf.isDeadLetterQueueEnabled()); + + // Check inherited value + qConf = new QueueConfiguration("test", _env, _fullHostConf); + assertTrue(qConf.isDeadLetterQueueEnabled()); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index d3628916a3..095d0b8a52 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.RandomAccessFile; +import java.io.Writer; import java.util.List; import java.util.Locale; @@ -37,6 +38,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.TestIoSession; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -466,6 +468,18 @@ public class ServerConfigurationTest extends TestCase assertEquals(10, serverConfig.getMinimumAlertRepeatGap()); } + public void testIsDeadLetterQueueEnabled() throws ConfigurationException + { + // Check default + ServerConfiguration serverConfig = new ServerConfiguration(_config); + assertFalse(serverConfig.isDeadLetterQueueEnabled()); + + // Check value we set + _config.setProperty("deadLetterQueues", true); + serverConfig = new ServerConfiguration(_config); + assertTrue(serverConfig.isDeadLetterQueueEnabled()); + } + public void testGetProcessors() throws ConfigurationException { // Check default @@ -1640,4 +1654,105 @@ public class ServerConfigurationTest extends TestCase assertEquals("Incorrect virtualhost count", 1, config.getVirtualHosts().length); assertEquals("Incorrect virtualhost name", "test-one", oneHost.getName()); } + + /** + * Convenience method to output required security preamble for broker config + */ + private void writeSecurity(Writer out) throws Exception + { + out.write("\t<management><enabled>false</enabled></management>\n"); + out.write("\t<security>\n"); + out.write("\t\t<principal-databases>\n"); + out.write("\t\t\t<principal-database>\n"); + out.write("\t\t\t\t<name>passwordfile</name>\n"); + out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n"); + out.write("\t\t\t\t<attributes>\n"); + out.write("\t\t\t\t\t<attribute>\n"); + out.write("\t\t\t\t\t\t<name>passwordFile</name>\n"); + out.write("\t\t\t\t\t\t<value>/dev/null</value>\n"); + out.write("\t\t\t\t\t</attribute>\n"); + out.write("\t\t\t\t</attributes>\n"); + out.write("\t\t\t</principal-database>\n"); + out.write("\t\t</principal-databases>\n"); + out.write("\t\t<jmx>\n"); + out.write("\t\t\t<access>/dev/null</access>\n"); + out.write("\t\t\t<principal-database>passwordfile</principal-database>\n"); + out.write("\t\t</jmx>\n"); + out.write("\t</security>\n"); + } + + /** + * Test XML configuration file correctly enables dead letter queues + */ + public void testDeadLetterQueueConfigurationFile() throws Exception + { + // Write config + File xml = File.createTempFile(getClass().getName(), "xml"); + xml.deleteOnExit(); + FileWriter config = new FileWriter(xml); + config.write("<broker>\n"); + writeSecurity(config); + config.write("<deadLetterQueues>true</deadLetterQueues>\n"); + config.write("<virtualhosts>\n"); + config.write("<virtualhost>\n"); + config.write("<name>test</name>\n"); + config.write("<test>\n"); + config.write("<queues>\n"); + config.write("<deadLetterQueues>false</deadLetterQueues>\n"); + config.write("<queue>\n"); + config.write("<name>biggles</name>\n"); + config.write("<biggles>\n"); + config.write("<deadLetterQueues>true</deadLetterQueues>\n"); + config.write("</biggles>\n"); + config.write("</queue>\n"); + config.write("<queue>\n"); + config.write("<name>beetle</name>\n"); + config.write("<beetle />\n"); + config.write("</queue>\n"); + config.write("</queues>\n"); + config.write("</test>\n"); + config.write("</virtualhost>\n"); + config.write("<virtualhost>\n"); + config.write("<name>extra</name>\n"); + config.write("<extra>\n"); + config.write("<queues>\n"); + config.write("<queue>\n"); + config.write("<name>r2d2</name>\n"); + config.write("<r2d2>\n"); + config.write("<deadLetterQueues>false</deadLetterQueues>\n"); + config.write("</r2d2>\n"); + config.write("</queue>\n"); + config.write("<queue>\n"); + config.write("<name>c3p0</name>\n"); + config.write("<c3p0 />\n"); + config.write("</queue>\n"); + config.write("</queues>\n"); + config.write("</extra>\n"); + config.write("</virtualhost>\n"); + config.write("</virtualhosts>\n"); + config.write("</broker>\n"); + config.close(); + + // Load config + ServerConfiguration server = new ServerConfiguration(xml.getAbsoluteFile()); + ApplicationRegistry registry = new ConfigurationFileApplicationRegistry(xml); + ApplicationRegistry.initialise(registry, 1); + + VirtualHostConfiguration test = server.getVirtualHostConfig("test"); + VirtualHostConfiguration extra = server.getVirtualHostConfig("extra"); + + QueueConfiguration biggles = test.getQueueConfiguration("biggles"); + QueueConfiguration beetle = test.getQueueConfiguration("beetle"); + QueueConfiguration r2d2 = extra.getQueueConfiguration("r2d2"); + QueueConfiguration c3p0 = extra.getQueueConfiguration("c3p0"); + + // Validate config + assertTrue("Broker DLQ should be configured as enabled", server.isDeadLetterQueueEnabled()); + assertFalse("Test vhost DLQ should be configured as disabled", test.isDeadLetterQueueEnabled()); + assertTrue("Extra vhost DLQ should be enabled, using broker default", extra.isDeadLetterQueueEnabled()); + assertTrue("Biggles queue DLQ should be configured as enabled", biggles.isDeadLetterQueueEnabled()); + assertFalse("Beetle queue DLQ should be disabled, using test vhost default", beetle.isDeadLetterQueueEnabled()); + assertFalse("R2D2 queue DLQ should be configured as disabled", r2d2.isDeadLetterQueueEnabled()); + assertTrue("C3P0 queue DLQ should be enabled, using broker default", c3p0.isDeadLetterQueueEnabled()); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index 5d3b4e681a..9e47324c9f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -21,9 +21,7 @@ package org.apache.qpid.server.configuration; import junit.framework.TestCase; -import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; @@ -46,6 +44,7 @@ public class VirtualHostConfigurationTest extends TestCase configXml = new XMLConfiguration(); configXml.setRootElementName("virtualhosts"); configXml.addProperty("virtualhost(-1).name", "test"); + configXml.addProperty("virtualhost(-1).name", "extra"); } public void tearDown() throws Exception @@ -59,28 +58,19 @@ public class VirtualHostConfigurationTest extends TestCase public void testQueuePriority() throws Exception { // Set up queue with 5 priorities - configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", - "atest"); - configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", - "amq.direct"); - configXml.addProperty("virtualhost.test.queues.queue.atest.priorities", - "5"); + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name", "atest"); + configXml.addProperty("virtualhost.test.queues.queue.atest.exchange", "amq.direct"); + configXml.addProperty("virtualhost.test.queues.queue.atest.priorities", "5"); // Set up queue with JMS style priorities - configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", - "ptest"); - configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange", - "amq.direct"); - configXml.addProperty("virtualhost.test.queues.queue.ptest.priority", - "true"); + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name", "ptest"); + configXml.addProperty("virtualhost.test.queues.queue.ptest.exchange", "amq.direct"); + configXml.addProperty("virtualhost.test.queues.queue.ptest.priority", "true"); // Set up queue with no priorities - configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", - "ntest"); - configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange", - "amq.direct"); - configXml.addProperty("virtualhost.test.queues.queue.ntest.priority", - "false"); + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name", "ntest"); + configXml.addProperty("virtualhost.test.queues.queue.ntest.exchange", "amq.direct"); + configXml.addProperty("virtualhost.test.queues.queue.ntest.priority", "false"); VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); @@ -107,13 +97,13 @@ public class VirtualHostConfigurationTest extends TestCase configXml.addProperty("virtualhost.test.queues.maximumMessageSize", "2"); configXml.addProperty("virtualhost.test.queues.maximumMessageAge", "3"); - configXml.addProperty("virtualhost.test.queues(-1).queue(1).name(1)", "atest"); - configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct"); - configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumQueueDepth", "4"); - configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageSize", "5"); - configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageAge", "6"); + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name", "atest"); + configXml.addProperty("virtualhost.test.queues.queue.atest.exchange", "amq.direct"); + configXml.addProperty("virtualhost.test.queues.queue.atest.maximumQueueDepth", "4"); + configXml.addProperty("virtualhost.test.queues.queue.atest.maximumMessageSize", "5"); + configXml.addProperty("virtualhost.test.queues.queue.atest.maximumMessageAge", "6"); - configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "btest"); + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name", "btest"); VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); @@ -130,5 +120,57 @@ public class VirtualHostConfigurationTest extends TestCase assertEquals(3, bTest.getMaximumMessageAge()); } + + /** + * Tests the full set of configuration options for enabling DLQs in the broker configuration. + */ + public void testIsDeadLetterQueueEnabled() throws Exception + { + // Set up vhosts and queues + configXml.addProperty("virtualhost.test.queues.deadLetterQueues", "true"); + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name", "biggles"); + configXml.addProperty("virtualhost.test.queues.queue.biggles.deadLetterQueues", "false"); + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name", "beetle"); + + configXml.addProperty("virtualhost.extra.queues(-1).queue(-1).name", "r2d2"); + configXml.addProperty("virtualhost.extra.queues.queue.r2d2.deadLetterQueues", "true"); + configXml.addProperty("virtualhost.extra.queues(-1).queue(-1).name", "c3p0"); + + // Get vhosts + VirtualHost test = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); + VirtualHost extra = new VirtualHost(new VirtualHostConfiguration("extra", configXml.subset("virtualhost.extra"))); + + // Enabled specifically + assertTrue("Test vhost DLQ was configured as enabled", test.getConfiguration().isDeadLetterQueueEnabled()); + assertTrue("r2d2 queue DLQ was configured as enabled", extra.getConfiguration().getQueueConfiguration("r2d2").isDeadLetterQueueEnabled()); + + // Enabled by test vhost default + assertTrue("beetle queue DLQ was configured as enabled", test.getConfiguration().getQueueConfiguration("beetle").isDeadLetterQueueEnabled()); + + // Disabled specifically + assertFalse("Biggles queue DLQ was configured as disabled", test.getConfiguration().getQueueConfiguration("biggles").isDeadLetterQueueEnabled()); + + // Using broker default of disabled + assertFalse("Extra vhost DLQ disabled, using broker default", extra.getConfiguration().isDeadLetterQueueEnabled()); + assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled()); + + // Get queues + AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles")); + AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle")); + AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2")); + AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0")); + + // Disabled specifically for this queue, overriding virtualhost setting + assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange()); + + // Enabled for all queues on the virtualhost + assertNotNull("Beetle queue should have an alt exchange as DLQ should be enabled, using test vhost default", beetle.getAlternateExchange()); + + // Enabled specifically for this queue, overriding the default broker setting of disabled + assertNotNull("R2D2 queue should have an alt exchange as DLQ should be configured as enabled", r2d2.getAlternateExchange()); + + // Disabled by the default broker setting + assertNull("C3PO queue should not have an alt exchange as DLQ should be disabled, using broker default", c3p0.getAlternateExchange()); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index e692069663..1c4f730949 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -76,6 +76,27 @@ public class AMQQueueFactoryTest extends TestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false, _virtualHost, null); assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); + assertNull("Queue should not have alternate exchange as DLQ isnt enabled", queue.getAlternateExchange()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument causes the alternate exchange to be set. + */ + public void testDeadLetterQueueEnabled() + { + FieldTable fieldTable = new FieldTable(); + fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_DLQ_ENABLED), true); + + try + { + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false, + _virtualHost, fieldTable); + assertNotNull("Queue should have an alternate exchange as DLQ is enabled", queue.getAlternateExchange()); } catch (AMQException e) { |
