summaryrefslogtreecommitdiff
path: root/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java490
1 files changed, 167 insertions, 323 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 409c528a4b..5003db1385 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -36,6 +38,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -45,7 +48,6 @@ import org.apache.qpid.server.virtualhost.QueueExistsException;
public class AMQQueueFactory implements QueueFactory
{
- public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key";
public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
@@ -59,381 +61,207 @@ public class AMQQueueFactory implements QueueFactory
{
_virtualHost = virtualHost;
_queueRegistry = queueRegistry;
+ }
+
+ @Override
+ public AMQQueue restoreQueue(Map<String, Object> attributes) throws QpidSecurityException
+ {
+ return createOrRestoreQueue(null, attributes, false);
+
}
- private abstract static class QueueProperty
+ @Override
+ public AMQQueue createQueue(final AMQSessionModel creatingSession,
+ Map<String, Object> attributes) throws QpidSecurityException
+ {
+ return createOrRestoreQueue(creatingSession, attributes, true);
+ }
+
+ private AMQQueue createOrRestoreQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes,
+ boolean createInStore) throws QpidSecurityException
{
- private final String _argumentName;
+ String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes);
- public QueueProperty(String argumentName)
+ QueueConfiguration config = _virtualHost.getConfiguration().getQueueConfiguration(queueName);
+
+ if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE) && config.getMaximumMessageAge() != 0)
{
- _argumentName = argumentName;
+ attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, config.getMaximumMessageAge());
}
-
- public String getArgumentName()
+ if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) && config.getMaximumQueueDepth() != 0)
{
- return _argumentName;
+ attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, config.getMaximumQueueDepth());
}
-
-
- public abstract void setPropertyValue(AMQQueue queue, Object value);
-
- }
-
- private abstract static class QueueLongProperty extends QueueProperty
- {
-
- public QueueLongProperty(String argumentName)
+ if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) && config.getMaximumMessageSize() != 0)
{
- super(argumentName);
+ attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, config.getMaximumMessageSize());
}
-
- public void setPropertyValue(AMQQueue queue, Object value)
+ if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) && config.getMaximumMessageCount() != 0)
{
- if(value instanceof Number)
- {
- setPropertyValue(queue, ((Number)value).longValue());
- }
-
+ attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, config.getMaximumMessageCount());
}
-
- abstract void setPropertyValue(AMQQueue queue, long value);
-
-
- }
-
- private abstract static class QueueIntegerProperty extends QueueProperty
- {
- public QueueIntegerProperty(String argumentName)
+ if (!attributes.containsKey(Queue.ALERT_REPEAT_GAP) && config.getMinimumAlertRepeatGap() != 0)
{
- super(argumentName);
+ attributes.put(Queue.ALERT_REPEAT_GAP, config.getMinimumAlertRepeatGap());
}
-
- public void setPropertyValue(AMQQueue queue, Object value)
+ if (config.getMaxDeliveryCount() != 0 && !attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS))
{
- if(value instanceof Number)
- {
- setPropertyValue(queue, ((Number)value).intValue());
- }
-
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, config.getMaxDeliveryCount());
}
- abstract void setPropertyValue(AMQQueue queue, int value);
- }
-
- private static final QueueProperty[] DECLARABLE_PROPERTIES = {
- new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE)
- {
- public void setPropertyValue(AMQQueue queue, long value)
- {
- queue.setMaximumMessageAge(value);
- }
- },
- new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)
- {
- public void setPropertyValue(AMQQueue queue, long value)
- {
- queue.setMaximumMessageSize(value);
- }
- },
- new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)
- {
- public void setPropertyValue(AMQQueue queue, long value)
- {
- queue.setMaximumMessageCount(value);
- }
- },
- new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)
- {
- public void setPropertyValue(AMQQueue queue, long value)
- {
- queue.setMaximumQueueDepth(value);
- }
- },
- new QueueLongProperty(Queue.ALERT_REPEAT_GAP)
- {
- public void setPropertyValue(AMQQueue queue, long value)
- {
- queue.setMinimumAlertRepeatGap(value);
- }
- },
- new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)
- {
- public void setPropertyValue(AMQQueue queue, long value)
- {
- queue.setCapacity(value);
- }
- },
- new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)
- {
- public void setPropertyValue(AMQQueue queue, long value)
- {
- queue.setFlowResumeCapacity(value);
- }
- },
- new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS)
- {
- public void setPropertyValue(AMQQueue queue, int value)
- {
- queue.setMaximumDeliveryCount(value);
- }
- }
- };
-
- @Override
- public AMQQueue restoreQueue(UUID id,
- String queueName,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments) throws QpidSecurityException
- {
- return createOrRestoreQueue(id, queueName, true, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, false);
-
- }
-
- /**
- * @param id the id to use.
- * @param deleteOnNoConsumer
- */
- @Override
- public AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments) throws QpidSecurityException
- {
- return createOrRestoreQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, true);
- }
-
- private AMQQueue createOrRestoreQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments,
- boolean createInStore) throws QpidSecurityException
- {
- if (id == null)
+ if (!attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) && config.getCapacity() != 0)
{
- throw new IllegalArgumentException("Queue id must not be null");
+ attributes.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, config.getCapacity());
}
- if (queueName == null)
+ if (!attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) && config.getFlowResumeCapacity() != 0)
{
- throw new IllegalArgumentException("Queue name must not be null");
+ attributes.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, config.getFlowResumeCapacity());
}
- QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName);
-
- boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration);
+ boolean createDLQ = createDLQ(attributes, config);
if (createDLQ)
{
validateDLNames(queueName);
}
- int priorities = 1;
- String conflationKey = null;
- String sortingKey = null;
-
- if(arguments != null)
- {
- if(arguments.containsKey(Queue.LVQ_KEY))
- {
- conflationKey = (String) arguments.get(Queue.LVQ_KEY);
- if(conflationKey == null)
- {
- conflationKey = QPID_DEFAULT_LVQ_KEY;
- }
- }
- else if(arguments.containsKey(Queue.PRIORITIES))
- {
- Object prioritiesObj = arguments.get(Queue.PRIORITIES);
- if(prioritiesObj instanceof Number)
- {
- priorities = ((Number)prioritiesObj).intValue();
- }
- else if(prioritiesObj instanceof String)
- {
- try
- {
- priorities = Integer.parseInt(prioritiesObj.toString());
- }
- catch (NumberFormatException e)
- {
- // TODO - should warn here of invalid format
- }
- }
- else
- {
- // TODO - should warn here of invalid format
- }
- }
- else if(arguments.containsKey(Queue.SORT_KEY))
- {
- sortingKey = (String)arguments.get(Queue.SORT_KEY);
- }
- }
+ AMQQueue queue;
- AMQQueue q;
- if(sortingKey != null)
+ if(attributes.containsKey(Queue.SORT_KEY))
{
- q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey);
+ queue = new SortedQueue(_virtualHost, creatingSession, attributes);
}
- else if(conflationKey != null)
+ else if(attributes.containsKey(Queue.LVQ_KEY))
{
- q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey);
+ queue = new ConflationQueue(_virtualHost, creatingSession, attributes);
}
- else if(priorities > 1)
+ else if(attributes.containsKey(Queue.PRIORITIES))
{
- q = new PriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
+ queue = new PriorityQueue(_virtualHost, creatingSession, attributes);
}
else
{
- q = new StandardQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
+ queue = new StandardQueue(_virtualHost, creatingSession, attributes);
}
- q.setDeleteOnNoConsumers(deleteOnNoConsumer);
-
//Register the new queue
- _queueRegistry.registerQueue(q);
+ _queueRegistry.registerQueue(queue);
- q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName));
-
- if(arguments != null)
+ if(createDLQ)
{
- for(QueueProperty p : DECLARABLE_PROPERTIES)
- {
- if(arguments.containsKey(p.getArgumentName()))
- {
- p.setPropertyValue(q, arguments.get(p.getArgumentName()));
- }
- }
-
- if(arguments.get(Queue.NO_LOCAL) instanceof Boolean)
- {
- q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL));
- }
-
+ createDLQ(queue);
}
-
- if(createDLQ)
+ else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
{
- final String dlExchangeName = getDeadLetterExchangeName(queueName);
- final String dlQueueName = getDeadLetterQueueName(queueName);
-
- Exchange dlExchange = null;
- final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName());
+ final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE);
+ Exchange altExchange;
try
{
- dlExchange = _virtualHost.createExchange(dlExchangeId,
- dlExchangeName,
- ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
- true, false, null);
- }
- catch(ExchangeExistsException e)
- {
- // We're ok if the exchange already exists
- dlExchange = e.getExistingExchange();
- }
- catch (ReservedExchangeNameException e)
- {
- throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
- }
- catch (AMQUnknownExchangeType e)
- {
- throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+ altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr));
}
- catch (UnknownExchangeException e)
+ catch(IllegalArgumentException e)
{
- throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+ altExchange = _virtualHost.getExchange(altExchangeAttr);
}
+ queue.setAlternateExchange(altExchange);
+ }
- AMQQueue dlQueue = null;
+ if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_SESSION_END))
+ {
+ DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), queue);
+ }
- synchronized(_queueRegistry)
- {
- dlQueue = _queueRegistry.getQueue(dlQueueName);
+ return queue;
+ }
- if(dlQueue == null)
- {
- //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc
- final Map<String, Object> args = new HashMap<String, Object>();
- args.put(Queue.CREATE_DLQ_ON_CREATION, false);
- args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
-
- try
- {
- dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive,
- false, args);
- }
- catch (QueueExistsException e)
- {
- throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " +
- "queue already exists, however this occurred within " +
- "a block where the queue existence had previously been " +
- "checked, and no queue creation should have been " +
- "possible from another thread", e);
- }
- }
- }
+ private void createDLQ(final AMQQueue queue) throws QpidSecurityException
+ {
+ final String queueName = queue.getName();
+ final String dlExchangeName = getDeadLetterExchangeName(queueName);
+ final String dlQueueName = getDeadLetterQueueName(queueName);
- //ensure the queue is bound to the exchange
- if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
- {
- //actual routing key used does not matter due to use of fanout exchange,
- //but we will make the key 'dlq' as it can be logged at creation.
- dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null);
- }
- q.setAlternateExchange(dlExchange);
+ Exchange dlExchange = null;
+ final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName());
+
+ try
+ {
+ dlExchange = _virtualHost.createExchange(dlExchangeId,
+ dlExchangeName,
+ ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
+ true, false, null);
+ }
+ catch(ExchangeExistsException e)
+ {
+ // We're ok if the exchange already exists
+ dlExchange = e.getExistingExchange();
+ }
+ catch (ReservedExchangeNameException e)
+ {
+ throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
}
- else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
+ catch (AMQUnknownExchangeType e)
{
+ throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+ }
+ catch (UnknownExchangeException e)
+ {
+ throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+ }
- final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE);
- Exchange altExchange;
- try
- {
- altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr));
- }
- catch(IllegalArgumentException e)
+ AMQQueue dlQueue = null;
+
+ synchronized(_queueRegistry)
+ {
+ dlQueue = _queueRegistry.getQueue(dlQueueName);
+
+ if(dlQueue == null)
{
- altExchange = _virtualHost.getExchange(altExchangeAttr);
+ //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc
+ final Map<String, Object> args = new HashMap<String, Object>();
+ args.put(Queue.CREATE_DLQ_ON_CREATION, false);
+ args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
+
+ try
+ {
+
+
+ args.put(Queue.ID, UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()));
+ args.put(Queue.NAME, dlQueueName);
+ args.put(Queue.DURABLE, true);
+ dlQueue = _virtualHost.createQueue(null, args);
+ }
+ catch (QueueExistsException e)
+ {
+ throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " +
+ "queue already exists, however this occurred within " +
+ "a block where the queue existence had previously been " +
+ "checked, and no queue creation should have been " +
+ "possible from another thread", e);
+ }
}
- q.setAlternateExchange(altExchange);
}
- if (createInStore && q.isDurable() && !q.isAutoDelete())
+ //ensure the queue is bound to the exchange
+ if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
{
- DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q);
+ //actual routing key used does not matter due to use of fanout exchange,
+ //but we will make the key 'dlq' as it can be logged at creation.
+ dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null);
}
-
- return q;
+ queue.setAlternateExchange(dlExchange);
}
public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws QpidSecurityException
{
- String queueName = config.getName();
-
- boolean durable = config.getDurable();
- boolean autodelete = config.getAutoDelete();
- boolean exclusive = config.getExclusive();
- String owner = config.getOwner();
- Map<String, Object> arguments = createQueueArgumentsFromConfig(config);
-
- // we need queues that are defined in config to have deterministic ids.
- UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName());
- AMQQueue q = createQueue(id, queueName, durable, owner, autodelete, exclusive, false, arguments);
- q.configure(config);
+ Map<String, Object> arguments = createQueueAttributesFromConfig(_virtualHost, config);
+
+ AMQQueue q = createOrRestoreQueue(null, arguments, false);
return q;
}
@@ -471,16 +299,19 @@ public class AMQQueueFactory implements QueueFactory
/**
* Checks if DLQ is enabled for the queue.
*
- * @param autoDelete
- * queue auto-delete flag
* @param arguments
* queue arguments
* @param qConfig
* queue configuration
* @return true if DLQ enabled
*/
- protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+ protected static boolean createDLQ(Map<String, Object> arguments, QueueConfiguration qConfig)
{
+ boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+ Queue.LIFETIME_POLICY,
+ arguments,
+ LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+
//feature is not to be enabled for temporary queues or when explicitly disabled by argument
if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
{
@@ -525,46 +356,59 @@ public class AMQQueueFactory implements QueueFactory
return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
}
- private static Map<String, Object> createQueueArgumentsFromConfig(QueueConfiguration config)
+ private static Map<String, Object> createQueueAttributesFromConfig(final VirtualHost virtualHost,
+ QueueConfiguration config)
{
- Map<String,Object> arguments = new HashMap<String,Object>();
+ Map<String,Object> attributes = new HashMap<String,Object>();
if(config.getArguments() != null && !config.getArguments().isEmpty())
{
- arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments())));
+ attributes.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments())));
}
if(config.isLVQ() || config.getLVQKey() != null)
{
- arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey());
+ attributes.put(Queue.LVQ_KEY,
+ config.getLVQKey() == null ? ConflationQueue.DEFAULT_LVQ_KEY : config.getLVQKey());
}
else if (config.getPriority() || config.getPriorities() > 0)
{
- arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+ attributes.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
}
else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
{
- arguments.put(Queue.SORT_KEY, config.getQueueSortKey());
+ attributes.put(Queue.SORT_KEY, config.getQueueSortKey());
}
if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
{
- arguments.put(Queue.CREATE_DLQ_ON_CREATION, true);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
}
if (config.getDescription() != null && !"".equals(config.getDescription()))
{
- arguments.put(Queue.DESCRIPTION, config.getDescription());
+ attributes.put(Queue.DESCRIPTION, config.getDescription());
}
- if (arguments.isEmpty())
+ attributes.put(Queue.DURABLE, config.getDurable());
+ attributes.put(Queue.LIFETIME_POLICY,
+ config.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT);
+ if(config.getExclusive())
{
- return Collections.emptyMap();
+ attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER);
}
- else
+ if(config.getOwner() != null)
{
- return arguments;
+ attributes.put(Queue.OWNER, config.getOwner());
}
+
+ attributes.put(Queue.NAME, config.getName());
+
+ // we need queues that are defined in config to have deterministic ids.
+ attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(config.getName(), virtualHost.getName()));
+
+
+ return attributes;
}
}