diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java | 252 |
1 files changed, 228 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 5fbad74978..14ca147982 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,22 +20,33 @@ */ package org.apache.qpid.server.queue; +import java.util.HashMap; +import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.configuration.QueueConfiguration; - -import java.util.Map; -import java.util.HashMap; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; public class AMQQueueFactory { - public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); + public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; + public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; + + public static final String DLQ_ROUTING_KEY = "dlq"; + public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled"; + public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count"; + public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; private abstract static class QueueProperty { @@ -80,6 +91,24 @@ public class AMQQueueFactory } + private abstract static class QueueIntegerProperty extends QueueProperty + { + public QueueIntegerProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).intValue()); + } + + } + abstract void setPropertyValue(AMQQueue queue, int value); + } + private static final QueueProperty[] DECLAREABLE_PROPERTIES = { new QueueLongProperty("x-qpid-maximum-message-age") { @@ -122,8 +151,14 @@ public class AMQQueueFactory { queue.setFlowResumeCapacity(value); } + }, + new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT) + { + public void setPropertyValue(AMQQueue queue, int value) + { + queue.setMaximumDeliveryCount(value); + } } - }; @@ -149,17 +184,31 @@ public class AMQQueueFactory String owner, boolean autoDelete, boolean exclusive, - VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException + VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException { + if (queueName == null) + { + throw new IllegalArgumentException("Queue name must not be null"); + } + // Access check if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner)) { String description = "Permission denied: queue-name '" + queueName + "'"; throw new AMQSecurityException(description); } - + + QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName); + boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration); + if (isDLQEnabled) + { + validateDLNames(queueName); + } + int priorities = 1; String conflationKey = null; + String sortingKey = null; + if(arguments != null) { if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) @@ -170,24 +219,32 @@ public class AMQQueueFactory conflationKey = QPID_LVQ_KEY; } } - else if(arguments.containsKey(X_QPID_PRIORITIES.toString())) + else if(arguments.containsKey(X_QPID_PRIORITIES)) { - Object prioritiesObj = arguments.get(X_QPID_PRIORITIES.toString()); + Object prioritiesObj = arguments.get(X_QPID_PRIORITIES); if(prioritiesObj instanceof Number) { priorities = ((Number)prioritiesObj).intValue(); } } + else if(arguments.containsKey(QPID_QUEUE_SORT_KEY)) + { + sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY); + } } AMQQueue q; - if(conflationKey != null) + if(sortingKey != null) + { + q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey); + } + else if(conflationKey != null) { q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); } else if(priorities > 1) { - q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments); + q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities); } else { @@ -209,10 +266,63 @@ public class AMQQueueFactory } } - return q; + if(isDLQEnabled) + { + final String dlExchangeName = getDeadLetterExchangeName(queueName); + final String dlQueueName = getDeadLetterQueueName(queueName); - } + final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + final QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + Exchange dlExchange = null; + synchronized(exchangeRegistry) + { + dlExchange = exchangeRegistry.getExchange(dlExchangeName); + + if(dlExchange == null) + { + dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0); + + exchangeRegistry.registerExchange(dlExchange); + + //enter the dle in the persistent store + virtualHost.getDurableConfigurationStore().createExchange(dlExchange); + } + } + + AMQQueue dlQueue = null; + + synchronized(queueRegistry) + { + dlQueue = queueRegistry.getQueue(dlQueueName); + + if(dlQueue == null) + { + //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc + final Map<String, Object> args = new HashMap<String, Object>(); + args.put(X_QPID_DLQ_ENABLED, false); + args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0); + + dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args); + //enter the dlq in the persistent store + virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); + } + } + + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) + { + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. + virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null); + } + q.setAlternateExchange(dlExchange); + } + + return q; + } public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException { @@ -223,26 +333,30 @@ public class AMQQueueFactory boolean exclusive = config.getExclusive(); String owner = config.getOwner(); Map<String,Object> arguments = null; + if(config.isLVQ() || config.getLVQKey() != null) { - arguments = new HashMap<String,Object>(); arguments.put(QPID_LAST_VALUE_QUEUE, 1); arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey()); } - else + else if (config.getPriority() || config.getPriorities() > 0) + { + arguments = new HashMap<String,Object>(); + arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); + } + else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey())) + { + arguments = new HashMap<String,Object>(); + arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey()); + } + if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) { - boolean priority = config.getPriority(); - int priorities = config.getPriorities(); - if(priority || priorities > 0) + if (arguments == null) { arguments = new HashMap<String,Object>(); - if (priorities < 0) - { - priorities = 10; - } - arguments.put("x-qpid-priorities", priorities); } + arguments.put(X_QPID_DLQ_ENABLED, true); } if(config.isTopic()) @@ -259,4 +373,94 @@ public class AMQQueueFactory return q; } + + /** + * Validates DLQ and DLE names + * <p> + * DLQ name and DLQ exchange name need to be validated in order to keep + * integrity in cases when queue name passes validation check but DLQ name + * or DL exchange name fails to pass it. Otherwise, we might have situations + * when queue is created but DL exchange or/and DLQ creation fail. + * <p> + * + * @param name + * queue name + * @throws IllegalArgumentException + * thrown if length of queue name or exchange name exceed 255 + */ + protected static void validateDLNames(String name) + { + // check if DLQ name and DLQ exchange name do not exceed 255 + String exchangeName = getDeadLetterExchangeName(name); + if (exchangeName.length() > AMQShortString.MAX_LENGTH) + { + throw new IllegalArgumentException("DL exchange name '" + exchangeName + + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name); + } + String queueName = getDeadLetterQueueName(name); + if (queueName.length() > AMQShortString.MAX_LENGTH) + { + throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of " + + AMQShortString.MAX_LENGTH + " characters for queue " + name); + } + } + + /** + * Checks if DLQ is enabled for the queue. + * + * @param autoDelete + * queue auto-delete flag + * @param arguments + * queue arguments + * @param qConfig + * queue configuration + * @return true if DLQ enabled + */ + protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) + { + //feature is not to be enabled for temporary queues or when explicitly disabled by argument + if (!autoDelete) + { + boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED); + if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled()) + { + boolean dlqEnabled = true; + if (dlqArgumentPresent) + { + Object argument = arguments.get(X_QPID_DLQ_ENABLED); + dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue(); + } + return dlqEnabled; + } + } + return false; + } + + /** + * Generates a dead letter queue name for a given queue name + * + * @param name + * queue name + * @return DLQ name + */ + protected static String getDeadLetterQueueName(String name) + { + ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration(); + String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix(); + return dlQueueName; + } + + /** + * Generates a dead letter exchange name for a given queue name + * + * @param name + * queue name + * @return DL exchange name + */ + protected static String getDeadLetterExchangeName(String name) + { + ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration(); + String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix(); + return dlExchangeName; + } } |