summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java252
1 files changed, 228 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 5fbad74978..14ca147982 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -20,22 +20,33 @@
*/
package org.apache.qpid.server.queue;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.QueueConfiguration;
-
-import java.util.Map;
-import java.util.HashMap;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
{
- public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+ public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+
+ public static final String DLQ_ROUTING_KEY = "dlq";
+ public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+ public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+ public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
private abstract static class QueueProperty
{
@@ -80,6 +91,24 @@ public class AMQQueueFactory
}
+ private abstract static class QueueIntegerProperty extends QueueProperty
+ {
+ public QueueIntegerProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).intValue());
+ }
+
+ }
+ abstract void setPropertyValue(AMQQueue queue, int value);
+ }
+
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
new QueueLongProperty("x-qpid-maximum-message-age")
{
@@ -122,8 +151,14 @@ public class AMQQueueFactory
{
queue.setFlowResumeCapacity(value);
}
+ },
+ new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT)
+ {
+ public void setPropertyValue(AMQQueue queue, int value)
+ {
+ queue.setMaximumDeliveryCount(value);
+ }
}
-
};
@@ -149,17 +184,31 @@ public class AMQQueueFactory
String owner,
boolean autoDelete,
boolean exclusive,
- VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException
+ VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
{
+ if (queueName == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+
// Access check
if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner))
{
String description = "Permission denied: queue-name '" + queueName + "'";
throw new AMQSecurityException(description);
}
-
+
+ QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName);
+ boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration);
+ if (isDLQEnabled)
+ {
+ validateDLNames(queueName);
+ }
+
int priorities = 1;
String conflationKey = null;
+ String sortingKey = null;
+
if(arguments != null)
{
if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
@@ -170,24 +219,32 @@ public class AMQQueueFactory
conflationKey = QPID_LVQ_KEY;
}
}
- else if(arguments.containsKey(X_QPID_PRIORITIES.toString()))
+ else if(arguments.containsKey(X_QPID_PRIORITIES))
{
- Object prioritiesObj = arguments.get(X_QPID_PRIORITIES.toString());
+ Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
if(prioritiesObj instanceof Number)
{
priorities = ((Number)prioritiesObj).intValue();
}
}
+ else if(arguments.containsKey(QPID_QUEUE_SORT_KEY))
+ {
+ sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY);
+ }
}
AMQQueue q;
- if(conflationKey != null)
+ if(sortingKey != null)
+ {
+ q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+ }
+ else if(conflationKey != null)
{
q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments);
+ q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
}
else
{
@@ -209,10 +266,63 @@ public class AMQQueueFactory
}
}
- return q;
+ if(isDLQEnabled)
+ {
+ final String dlExchangeName = getDeadLetterExchangeName(queueName);
+ final String dlQueueName = getDeadLetterQueueName(queueName);
- }
+ final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ Exchange dlExchange = null;
+ synchronized(exchangeRegistry)
+ {
+ dlExchange = exchangeRegistry.getExchange(dlExchangeName);
+
+ if(dlExchange == null)
+ {
+ dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+
+ exchangeRegistry.registerExchange(dlExchange);
+
+ //enter the dle in the persistent store
+ virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+ }
+ }
+
+ AMQQueue dlQueue = null;
+
+ synchronized(queueRegistry)
+ {
+ dlQueue = queueRegistry.getQueue(dlQueueName);
+
+ if(dlQueue == null)
+ {
+ //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc
+ final Map<String, Object> args = new HashMap<String, Object>();
+ args.put(X_QPID_DLQ_ENABLED, false);
+ args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
+
+ dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
+ //enter the dlq in the persistent store
+ virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+ }
+ }
+
+ //ensure the queue is bound to the exchange
+ if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+ {
+ //actual routing key used does not matter due to use of fanout exchange,
+ //but we will make the key 'dlq' as it can be logged at creation.
+ virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null);
+ }
+ q.setAlternateExchange(dlExchange);
+ }
+
+ return q;
+ }
public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
{
@@ -223,26 +333,30 @@ public class AMQQueueFactory
boolean exclusive = config.getExclusive();
String owner = config.getOwner();
Map<String,Object> arguments = null;
+
if(config.isLVQ() || config.getLVQKey() != null)
{
-
arguments = new HashMap<String,Object>();
arguments.put(QPID_LAST_VALUE_QUEUE, 1);
arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
}
- else
+ else if (config.getPriority() || config.getPriorities() > 0)
+ {
+ arguments = new HashMap<String,Object>();
+ arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+ }
+ else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
+ {
+ arguments = new HashMap<String,Object>();
+ arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
+ }
+ if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
{
- boolean priority = config.getPriority();
- int priorities = config.getPriorities();
- if(priority || priorities > 0)
+ if (arguments == null)
{
arguments = new HashMap<String,Object>();
- if (priorities < 0)
- {
- priorities = 10;
- }
- arguments.put("x-qpid-priorities", priorities);
}
+ arguments.put(X_QPID_DLQ_ENABLED, true);
}
if(config.isTopic())
@@ -259,4 +373,94 @@ public class AMQQueueFactory
return q;
}
+
+ /**
+ * Validates DLQ and DLE names
+ * <p>
+ * DLQ name and DLQ exchange name need to be validated in order to keep
+ * integrity in cases when queue name passes validation check but DLQ name
+ * or DL exchange name fails to pass it. Otherwise, we might have situations
+ * when queue is created but DL exchange or/and DLQ creation fail.
+ * <p>
+ *
+ * @param name
+ * queue name
+ * @throws IllegalArgumentException
+ * thrown if length of queue name or exchange name exceed 255
+ */
+ protected static void validateDLNames(String name)
+ {
+ // check if DLQ name and DLQ exchange name do not exceed 255
+ String exchangeName = getDeadLetterExchangeName(name);
+ if (exchangeName.length() > AMQShortString.MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DL exchange name '" + exchangeName
+ + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ }
+ String queueName = getDeadLetterQueueName(name);
+ if (queueName.length() > AMQShortString.MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
+ + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ }
+ }
+
+ /**
+ * Checks if DLQ is enabled for the queue.
+ *
+ * @param autoDelete
+ * queue auto-delete flag
+ * @param arguments
+ * queue arguments
+ * @param qConfig
+ * queue configuration
+ * @return true if DLQ enabled
+ */
+ protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+ {
+ //feature is not to be enabled for temporary queues or when explicitly disabled by argument
+ if (!autoDelete)
+ {
+ boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED);
+ if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled())
+ {
+ boolean dlqEnabled = true;
+ if (dlqArgumentPresent)
+ {
+ Object argument = arguments.get(X_QPID_DLQ_ENABLED);
+ dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue();
+ }
+ return dlqEnabled;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Generates a dead letter queue name for a given queue name
+ *
+ * @param name
+ * queue name
+ * @return DLQ name
+ */
+ protected static String getDeadLetterQueueName(String name)
+ {
+ ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+ String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix();
+ return dlQueueName;
+ }
+
+ /**
+ * Generates a dead letter exchange name for a given queue name
+ *
+ * @param name
+ * queue name
+ * @return DL exchange name
+ */
+ protected static String getDeadLetterExchangeName(String name)
+ {
+ ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+ String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
+ return dlExchangeName;
+ }
}