diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java | 109 |
1 files changed, 85 insertions, 24 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index aa70bb3e8d..0e9b879316 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -34,6 +35,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; @@ -69,7 +73,9 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener @@ -529,7 +535,10 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg int purged = queue.delete(); getQueueRegistry().unregisterQueue(queue.getName()); - if (queue.isDurable() && !queue.isAutoDelete()) + if (queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) { DurableConfigurationStore store = getDurableConfigurationStore(); DurableConfigurationStoreHelper.removeQueue(store, queue); @@ -538,26 +547,24 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } } - @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QpidSecurityException, QueueExistsException + public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes) throws QpidSecurityException, QueueExistsException { + // make a copy as we may augment (with an ID for example) + attributes = new LinkedHashMap<String, Object>(attributes); - if (queueName == null) - { - throw new IllegalArgumentException("Queue name must not be null"); - } + String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes); + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + attributes, + LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false); + ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE); + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); - // Access check + // Access check if (!getSecurityManager().authoriseCreateQueue(autoDelete, durable, - exclusive, + exclusive != null && exclusive != ExclusivityPolicy.NONE, null, null, queueName, @@ -573,22 +580,27 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName)); } - if(id == null) + if(!attributes.containsKey(Queue.ID)) { - id = UUIDGenerator.generateExchangeUUID(queueName, getName()); + UUID id = UUIDGenerator.generateExchangeUUID(queueName, getName()); while(_queueRegistry.getQueue(id) != null) { id = UUID.randomUUID(); } + attributes.put(Queue.ID, id); } - else if(_queueRegistry.getQueue(id) != null) + else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null) { - throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName)); + throw new QueueExistsException("Queue with id " + + MapValueConverter.getUUIDAttribute(Queue.ID, + attributes) + + " already exists", _queueRegistry.getQueue(queueName)); } - return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, - arguments); + + + return _queueFactory.createQueue(creatingSession, attributes); } } @@ -980,13 +992,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg // house keeping task from running. } } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + for (AMQConnectionModel<?,?> connection : getConnectionRegistry().getConnections()) { if (_logger.isDebugEnabled()) { _logger.debug("Checking for long running open transactions on connection " + connection); } - for (AMQSessionModel session : connection.getSessionModels()) + for (AMQSessionModel<?,?> session : connection.getSessionModels()) { if (_logger.isDebugEnabled()) { @@ -1046,5 +1058,54 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { return _model; } + + } + + @Override + public long getDefaultAlertThresholdMessageAge() + { + return getConfiguration().getMaximumMessageAge(); + } + + @Override + public long getDefaultAlertThresholdMessageSize() + { + return getConfiguration().getMaximumMessageSize(); + } + + @Override + public long getDefaultAlertThresholdQueueDepthMessages() + { + return getConfiguration().getMaximumMessageCount(); + } + + @Override + public long getDefaultAlertThresholdQueueDepthBytes() + { + return getConfiguration().getMaximumQueueDepth(); + } + + @Override + public long getDefaultAlertRepeatGap() + { + return getConfiguration().getMinimumAlertRepeatGap(); + } + + @Override + public long getDefaultQueueFlowControlSizeBytes() + { + return getConfiguration().getCapacity(); + } + + @Override + public long getDefaultQueueFlowResumeSizeBytes() + { + return getConfiguration().getFlowResumeCapacity(); + } + + @Override + public int getDefaultMaximumDeliveryAttempts() + { + return getConfiguration().getMaxDeliveryCount(); } } |