diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java | 490 |
1 files changed, 167 insertions, 323 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 409c528a4b..5003db1385 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,12 +20,14 @@ */ package org.apache.qpid.server.queue; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.BrokerProperties; @@ -36,6 +38,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; @@ -45,7 +48,6 @@ import org.apache.qpid.server.virtualhost.QueueExistsException; public class AMQQueueFactory implements QueueFactory { - public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key"; public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; @@ -59,381 +61,207 @@ public class AMQQueueFactory implements QueueFactory { _virtualHost = virtualHost; _queueRegistry = queueRegistry; + } + + @Override + public AMQQueue restoreQueue(Map<String, Object> attributes) throws QpidSecurityException + { + return createOrRestoreQueue(null, attributes, false); + } - private abstract static class QueueProperty + @Override + public AMQQueue createQueue(final AMQSessionModel creatingSession, + Map<String, Object> attributes) throws QpidSecurityException + { + return createOrRestoreQueue(creatingSession, attributes, true); + } + + private AMQQueue createOrRestoreQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes, + boolean createInStore) throws QpidSecurityException { - private final String _argumentName; + String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes); - public QueueProperty(String argumentName) + QueueConfiguration config = _virtualHost.getConfiguration().getQueueConfiguration(queueName); + + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE) && config.getMaximumMessageAge() != 0) { - _argumentName = argumentName; + attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, config.getMaximumMessageAge()); } - - public String getArgumentName() + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) && config.getMaximumQueueDepth() != 0) { - return _argumentName; + attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, config.getMaximumQueueDepth()); } - - - public abstract void setPropertyValue(AMQQueue queue, Object value); - - } - - private abstract static class QueueLongProperty extends QueueProperty - { - - public QueueLongProperty(String argumentName) + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) && config.getMaximumMessageSize() != 0) { - super(argumentName); + attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, config.getMaximumMessageSize()); } - - public void setPropertyValue(AMQQueue queue, Object value) + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) && config.getMaximumMessageCount() != 0) { - if(value instanceof Number) - { - setPropertyValue(queue, ((Number)value).longValue()); - } - + attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, config.getMaximumMessageCount()); } - - abstract void setPropertyValue(AMQQueue queue, long value); - - - } - - private abstract static class QueueIntegerProperty extends QueueProperty - { - public QueueIntegerProperty(String argumentName) + if (!attributes.containsKey(Queue.ALERT_REPEAT_GAP) && config.getMinimumAlertRepeatGap() != 0) { - super(argumentName); + attributes.put(Queue.ALERT_REPEAT_GAP, config.getMinimumAlertRepeatGap()); } - - public void setPropertyValue(AMQQueue queue, Object value) + if (config.getMaxDeliveryCount() != 0 && !attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS)) { - if(value instanceof Number) - { - setPropertyValue(queue, ((Number)value).intValue()); - } - + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, config.getMaxDeliveryCount()); } - abstract void setPropertyValue(AMQQueue queue, int value); - } - - private static final QueueProperty[] DECLARABLE_PROPERTIES = { - new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageAge(value); - } - }, - new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageSize(value); - } - }, - new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageCount(value); - } - }, - new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumQueueDepth(value); - } - }, - new QueueLongProperty(Queue.ALERT_REPEAT_GAP) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMinimumAlertRepeatGap(value); - } - }, - new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setCapacity(value); - } - }, - new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setFlowResumeCapacity(value); - } - }, - new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS) - { - public void setPropertyValue(AMQQueue queue, int value) - { - queue.setMaximumDeliveryCount(value); - } - } - }; - - @Override - public AMQQueue restoreQueue(UUID id, - String queueName, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QpidSecurityException - { - return createOrRestoreQueue(id, queueName, true, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, false); - - } - - /** - * @param id the id to use. - * @param deleteOnNoConsumer - */ - @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QpidSecurityException - { - return createOrRestoreQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, true); - } - - private AMQQueue createOrRestoreQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments, - boolean createInStore) throws QpidSecurityException - { - if (id == null) + if (!attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) && config.getCapacity() != 0) { - throw new IllegalArgumentException("Queue id must not be null"); + attributes.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, config.getCapacity()); } - if (queueName == null) + if (!attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) && config.getFlowResumeCapacity() != 0) { - throw new IllegalArgumentException("Queue name must not be null"); + attributes.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, config.getFlowResumeCapacity()); } - QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName); - - boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration); + boolean createDLQ = createDLQ(attributes, config); if (createDLQ) { validateDLNames(queueName); } - int priorities = 1; - String conflationKey = null; - String sortingKey = null; - - if(arguments != null) - { - if(arguments.containsKey(Queue.LVQ_KEY)) - { - conflationKey = (String) arguments.get(Queue.LVQ_KEY); - if(conflationKey == null) - { - conflationKey = QPID_DEFAULT_LVQ_KEY; - } - } - else if(arguments.containsKey(Queue.PRIORITIES)) - { - Object prioritiesObj = arguments.get(Queue.PRIORITIES); - if(prioritiesObj instanceof Number) - { - priorities = ((Number)prioritiesObj).intValue(); - } - else if(prioritiesObj instanceof String) - { - try - { - priorities = Integer.parseInt(prioritiesObj.toString()); - } - catch (NumberFormatException e) - { - // TODO - should warn here of invalid format - } - } - else - { - // TODO - should warn here of invalid format - } - } - else if(arguments.containsKey(Queue.SORT_KEY)) - { - sortingKey = (String)arguments.get(Queue.SORT_KEY); - } - } + AMQQueue queue; - AMQQueue q; - if(sortingKey != null) + if(attributes.containsKey(Queue.SORT_KEY)) { - q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey); + queue = new SortedQueue(_virtualHost, creatingSession, attributes); } - else if(conflationKey != null) + else if(attributes.containsKey(Queue.LVQ_KEY)) { - q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey); + queue = new ConflationQueue(_virtualHost, creatingSession, attributes); } - else if(priorities > 1) + else if(attributes.containsKey(Queue.PRIORITIES)) { - q = new PriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities); + queue = new PriorityQueue(_virtualHost, creatingSession, attributes); } else { - q = new StandardQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments); + queue = new StandardQueue(_virtualHost, creatingSession, attributes); } - q.setDeleteOnNoConsumers(deleteOnNoConsumer); - //Register the new queue - _queueRegistry.registerQueue(q); + _queueRegistry.registerQueue(queue); - q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName)); - - if(arguments != null) + if(createDLQ) { - for(QueueProperty p : DECLARABLE_PROPERTIES) - { - if(arguments.containsKey(p.getArgumentName())) - { - p.setPropertyValue(q, arguments.get(p.getArgumentName())); - } - } - - if(arguments.get(Queue.NO_LOCAL) instanceof Boolean) - { - q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL)); - } - + createDLQ(queue); } - - if(createDLQ) + else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String) { - final String dlExchangeName = getDeadLetterExchangeName(queueName); - final String dlQueueName = getDeadLetterQueueName(queueName); - - Exchange dlExchange = null; - final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); + final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE); + Exchange altExchange; try { - dlExchange = _virtualHost.createExchange(dlExchangeId, - dlExchangeName, - ExchangeDefaults.FANOUT_EXCHANGE_CLASS, - true, false, null); - } - catch(ExchangeExistsException e) - { - // We're ok if the exchange already exists - dlExchange = e.getExistingExchange(); - } - catch (ReservedExchangeNameException e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); - } - catch (AMQUnknownExchangeType e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); } - catch (UnknownExchangeException e) + catch(IllegalArgumentException e) { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + altExchange = _virtualHost.getExchange(altExchangeAttr); } + queue.setAlternateExchange(altExchange); + } - AMQQueue dlQueue = null; + if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) + { + DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), queue); + } - synchronized(_queueRegistry) - { - dlQueue = _queueRegistry.getQueue(dlQueueName); + return queue; + } - if(dlQueue == null) - { - //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc - final Map<String, Object> args = new HashMap<String, Object>(); - args.put(Queue.CREATE_DLQ_ON_CREATION, false); - args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); - - try - { - dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive, - false, args); - } - catch (QueueExistsException e) - { - throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " + - "queue already exists, however this occurred within " + - "a block where the queue existence had previously been " + - "checked, and no queue creation should have been " + - "possible from another thread", e); - } - } - } + private void createDLQ(final AMQQueue queue) throws QpidSecurityException + { + final String queueName = queue.getName(); + final String dlExchangeName = getDeadLetterExchangeName(queueName); + final String dlQueueName = getDeadLetterQueueName(queueName); - //ensure the queue is bound to the exchange - if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) - { - //actual routing key used does not matter due to use of fanout exchange, - //but we will make the key 'dlq' as it can be logged at creation. - dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); - } - q.setAlternateExchange(dlExchange); + Exchange dlExchange = null; + final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); + + try + { + dlExchange = _virtualHost.createExchange(dlExchangeId, + dlExchangeName, + ExchangeDefaults.FANOUT_EXCHANGE_CLASS, + true, false, null); + } + catch(ExchangeExistsException e) + { + // We're ok if the exchange already exists + dlExchange = e.getExistingExchange(); + } + catch (ReservedExchangeNameException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); } - else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String) + catch (AMQUnknownExchangeType e) { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } + catch (UnknownExchangeException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } - final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE); - Exchange altExchange; - try - { - altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); - } - catch(IllegalArgumentException e) + AMQQueue dlQueue = null; + + synchronized(_queueRegistry) + { + dlQueue = _queueRegistry.getQueue(dlQueueName); + + if(dlQueue == null) { - altExchange = _virtualHost.getExchange(altExchangeAttr); + //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc + final Map<String, Object> args = new HashMap<String, Object>(); + args.put(Queue.CREATE_DLQ_ON_CREATION, false); + args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); + + try + { + + + args.put(Queue.ID, UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName())); + args.put(Queue.NAME, dlQueueName); + args.put(Queue.DURABLE, true); + dlQueue = _virtualHost.createQueue(null, args); + } + catch (QueueExistsException e) + { + throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " + + "queue already exists, however this occurred within " + + "a block where the queue existence had previously been " + + "checked, and no queue creation should have been " + + "possible from another thread", e); + } } - q.setAlternateExchange(altExchange); } - if (createInStore && q.isDurable() && !q.isAutoDelete()) + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) { - DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q); + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. + dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); } - - return q; + queue.setAlternateExchange(dlExchange); } public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws QpidSecurityException { - String queueName = config.getName(); - - boolean durable = config.getDurable(); - boolean autodelete = config.getAutoDelete(); - boolean exclusive = config.getExclusive(); - String owner = config.getOwner(); - Map<String, Object> arguments = createQueueArgumentsFromConfig(config); - - // we need queues that are defined in config to have deterministic ids. - UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName()); - AMQQueue q = createQueue(id, queueName, durable, owner, autodelete, exclusive, false, arguments); - q.configure(config); + Map<String, Object> arguments = createQueueAttributesFromConfig(_virtualHost, config); + + AMQQueue q = createOrRestoreQueue(null, arguments, false); return q; } @@ -471,16 +299,19 @@ public class AMQQueueFactory implements QueueFactory /** * Checks if DLQ is enabled for the queue. * - * @param autoDelete - * queue auto-delete flag * @param arguments * queue arguments * @param qConfig * queue configuration * @return true if DLQ enabled */ - protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) + protected static boolean createDLQ(Map<String, Object> arguments, QueueConfiguration qConfig) { + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + arguments, + LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; + //feature is not to be enabled for temporary queues or when explicitly disabled by argument if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) { @@ -525,46 +356,59 @@ public class AMQQueueFactory implements QueueFactory return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); } - private static Map<String, Object> createQueueArgumentsFromConfig(QueueConfiguration config) + private static Map<String, Object> createQueueAttributesFromConfig(final VirtualHost virtualHost, + QueueConfiguration config) { - Map<String,Object> arguments = new HashMap<String,Object>(); + Map<String,Object> attributes = new HashMap<String,Object>(); if(config.getArguments() != null && !config.getArguments().isEmpty()) { - arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments()))); + attributes.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments()))); } if(config.isLVQ() || config.getLVQKey() != null) { - arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey()); + attributes.put(Queue.LVQ_KEY, + config.getLVQKey() == null ? ConflationQueue.DEFAULT_LVQ_KEY : config.getLVQKey()); } else if (config.getPriority() || config.getPriorities() > 0) { - arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); + attributes.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); } else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey())) { - arguments.put(Queue.SORT_KEY, config.getQueueSortKey()); + attributes.put(Queue.SORT_KEY, config.getQueueSortKey()); } if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) { - arguments.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); } if (config.getDescription() != null && !"".equals(config.getDescription())) { - arguments.put(Queue.DESCRIPTION, config.getDescription()); + attributes.put(Queue.DESCRIPTION, config.getDescription()); } - if (arguments.isEmpty()) + attributes.put(Queue.DURABLE, config.getDurable()); + attributes.put(Queue.LIFETIME_POLICY, + config.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT); + if(config.getExclusive()) { - return Collections.emptyMap(); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER); } - else + if(config.getOwner() != null) { - return arguments; + attributes.put(Queue.OWNER, config.getOwner()); } + + attributes.put(Queue.NAME, config.getName()); + + // we need queues that are defined in config to have deterministic ids. + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(config.getName(), virtualHost.getName())); + + + return attributes; } } |