diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java | 542 |
1 files changed, 542 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java new file mode 100644 index 0000000000..029c7e4f86 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -0,0 +1,542 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; +import org.apache.qpid.server.virtualhost.ExchangeExistsException; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class AMQQueueFactory implements QueueFactory +{ + public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key"; + + + public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; + public static final String DLQ_ROUTING_KEY = "dlq"; + private static final int MAX_LENGTH = 255; + + private final VirtualHost _virtualHost; + private final QueueRegistry _queueRegistry; + + public AMQQueueFactory(VirtualHost virtualHost, QueueRegistry queueRegistry) + { + _virtualHost = virtualHost; + _queueRegistry = queueRegistry; + } + + private abstract static class QueueProperty + { + + private final String _argumentName; + + + public QueueProperty(String argumentName) + { + _argumentName = argumentName; + } + + public String getArgumentName() + { + return _argumentName; + } + + + public abstract void setPropertyValue(AMQQueue queue, Object value); + + } + + private abstract static class QueueLongProperty extends QueueProperty + { + + public QueueLongProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).longValue()); + } + + } + + abstract void setPropertyValue(AMQQueue queue, long value); + + + } + + private abstract static class QueueIntegerProperty extends QueueProperty + { + public QueueIntegerProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).intValue()); + } + + } + abstract void setPropertyValue(AMQQueue queue, int value); + } + + private static final QueueProperty[] DECLAREABLE_PROPERTIES = { + new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE) + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageAge(value); + } + }, + new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageSize(value); + } + }, + new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageCount(value); + } + }, + new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumQueueDepth(value); + } + }, + new QueueLongProperty(Queue.ALERT_REPEAT_GAP) + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMinimumAlertRepeatGap(value); + } + }, + new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setCapacity(value); + } + }, + new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setFlowResumeCapacity(value); + } + }, + new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS) + { + public void setPropertyValue(AMQQueue queue, int value) + { + queue.setMaximumDeliveryCount(value); + } + } + }; + + @Override + public AMQQueue restoreQueue(UUID id, + String queueName, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException + { + return createOrRestoreQueue(id, queueName, true, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, false); + + } + + /** + * @param id the id to use. + * @param deleteOnNoConsumer + */ + @Override + public AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException + { + return createOrRestoreQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, true); + } + + private AMQQueue createOrRestoreQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments, + boolean createInStore) throws AMQSecurityException, AMQException + { + if (id == null) + { + throw new IllegalArgumentException("Queue id must not be null"); + } + if (queueName == null) + { + throw new IllegalArgumentException("Queue name must not be null"); + } + + + QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName); + + boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration); + if (createDLQ) + { + validateDLNames(queueName); + } + + int priorities = 1; + String conflationKey = null; + String sortingKey = null; + + if(arguments != null) + { + if(arguments.containsKey(Queue.LVQ_KEY)) + { + conflationKey = (String) arguments.get(Queue.LVQ_KEY); + if(conflationKey == null) + { + conflationKey = QPID_DEFAULT_LVQ_KEY; + } + } + else if(arguments.containsKey(Queue.PRIORITIES)) + { + Object prioritiesObj = arguments.get(Queue.PRIORITIES); + if(prioritiesObj instanceof Number) + { + priorities = ((Number)prioritiesObj).intValue(); + } + else if(prioritiesObj instanceof String) + { + try + { + priorities = Integer.parseInt(prioritiesObj.toString()); + } + catch (NumberFormatException e) + { + // TODO - should warn here of invalid format + } + } + else + { + // TODO - should warn here of invalid format + } + } + else if(arguments.containsKey(Queue.SORT_KEY)) + { + sortingKey = (String)arguments.get(Queue.SORT_KEY); + } + } + + AMQQueue q; + if(sortingKey != null) + { + q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey); + } + else if(conflationKey != null) + { + q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey); + } + else if(priorities > 1) + { + q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities); + } + else + { + q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments); + } + + q.setDeleteOnNoConsumers(deleteOnNoConsumer); + + //Register the new queue + _queueRegistry.registerQueue(q); + + q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName)); + + if(arguments != null) + { + for(QueueProperty p : DECLAREABLE_PROPERTIES) + { + if(arguments.containsKey(p.getArgumentName())) + { + p.setPropertyValue(q, arguments.get(p.getArgumentName())); + } + } + + if(arguments.get(Queue.NO_LOCAL) instanceof Boolean) + { + q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL)); + } + + } + + if(createDLQ) + { + final String dlExchangeName = getDeadLetterExchangeName(queueName); + final String dlQueueName = getDeadLetterQueueName(queueName); + + Exchange dlExchange = null; + final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); + + try + { + dlExchange = _virtualHost.createExchange(dlExchangeId, + dlExchangeName, + ExchangeDefaults.FANOUT_EXCHANGE_CLASS, + true, false, null); + } + catch(ExchangeExistsException e) + { + // We're ok if the exchange already exists + dlExchange = e.getExistingExchange(); + } + + AMQQueue dlQueue = null; + + synchronized(_queueRegistry) + { + dlQueue = _queueRegistry.getQueue(dlQueueName); + + if(dlQueue == null) + { + //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc + final Map<String, Object> args = new HashMap<String, Object>(); + args.put(Queue.CREATE_DLQ_ON_CREATION, false); + args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); + + dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive, + false, args); + } + } + + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) + { + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. + dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); + } + q.setAlternateExchange(dlExchange); + } + else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String) + { + + final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE); + Exchange altExchange; + try + { + altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); + } + catch(IllegalArgumentException e) + { + altExchange = _virtualHost.getExchange(altExchangeAttr); + } + q.setAlternateExchange(altExchange); + } + + if (createInStore && q.isDurable() && !q.isAutoDelete()) + { + DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q); + } + + return q; + } + + public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws AMQException + { + String queueName = config.getName(); + + boolean durable = config.getDurable(); + boolean autodelete = config.getAutoDelete(); + boolean exclusive = config.getExclusive(); + String owner = config.getOwner(); + Map<String, Object> arguments = createQueueArgumentsFromConfig(config); + + // we need queues that are defined in config to have deterministic ids. + UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName()); + + AMQQueue q = createQueue(id, queueName, durable, owner, autodelete, exclusive, false, arguments); + q.configure(config); + return q; + } + + /** + * Validates DLQ and DLE names + * <p> + * DLQ name and DLQ exchange name need to be validated in order to keep + * integrity in cases when queue name passes validation check but DLQ name + * or DL exchange name fails to pass it. Otherwise, we might have situations + * when queue is created but DL exchange or/and DLQ creation fail. + * <p> + * + * @param name + * queue name + * @throws IllegalArgumentException + * thrown if length of queue name or exchange name exceed 255 + */ + protected static void validateDLNames(String name) + { + // check if DLQ name and DLQ exchange name do not exceed 255 + String exchangeName = getDeadLetterExchangeName(name); + if (exchangeName.length() > MAX_LENGTH) + { + throw new IllegalArgumentException("DL exchange name '" + exchangeName + + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name); + } + String queueName = getDeadLetterQueueName(name); + if (queueName.length() > MAX_LENGTH) + { + throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of " + + MAX_LENGTH + " characters for queue " + name); + } + } + + /** + * Checks if DLQ is enabled for the queue. + * + * @param autoDelete + * queue auto-delete flag + * @param arguments + * queue arguments + * @param qConfig + * queue configuration + * @return true if DLQ enabled + */ + protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) + { + //feature is not to be enabled for temporary queues or when explicitly disabled by argument + if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) + { + boolean dlqArgumentPresent = arguments != null + && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION); + if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled()) + { + boolean dlqEnabled = true; + if (dlqArgumentPresent) + { + Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION); + dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue()) + || (argument instanceof String && Boolean.parseBoolean(argument.toString())); + } + return dlqEnabled ; + } + } + return false; + } + + /** + * Generates a dead letter queue name for a given queue name + * + * @param name + * queue name + * @return DLQ name + */ + protected static String getDeadLetterQueueName(String name) + { + return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, DEFAULT_DLQ_NAME_SUFFIX); + } + + /** + * Generates a dead letter exchange name for a given queue name + * + * @param name + * queue name + * @return DL exchange name + */ + protected static String getDeadLetterExchangeName(String name) + { + return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + } + + private static Map<String, Object> createQueueArgumentsFromConfig(QueueConfiguration config) + { + Map<String,Object> arguments = new HashMap<String,Object>(); + + if(config.getArguments() != null && !config.getArguments().isEmpty()) + { + arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments()))); + } + + if(config.isLVQ() || config.getLVQKey() != null) + { + arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey()); + } + else if (config.getPriority() || config.getPriorities() > 0) + { + arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); + } + else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey())) + { + arguments.put(Queue.SORT_KEY, config.getQueueSortKey()); + } + + if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) + { + arguments.put(Queue.CREATE_DLQ_ON_CREATION, true); + } + + if (config.getDescription() != null && !"".equals(config.getDescription())) + { + arguments.put(Queue.DESCRIPTION, config.getDescription()); + } + + if (arguments.isEmpty()) + { + return Collections.emptyMap(); + } + else + { + return arguments; + } + } + +} |