diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/virtualhost')
5 files changed, 155 insertions, 43 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index aa70bb3e8d..0e9b879316 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -34,6 +35,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; @@ -69,7 +73,9 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener @@ -529,7 +535,10 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg int purged = queue.delete(); getQueueRegistry().unregisterQueue(queue.getName()); - if (queue.isDurable() && !queue.isAutoDelete()) + if (queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) { DurableConfigurationStore store = getDurableConfigurationStore(); DurableConfigurationStoreHelper.removeQueue(store, queue); @@ -538,26 +547,24 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } } - @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QpidSecurityException, QueueExistsException + public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes) throws QpidSecurityException, QueueExistsException { + // make a copy as we may augment (with an ID for example) + attributes = new LinkedHashMap<String, Object>(attributes); - if (queueName == null) - { - throw new IllegalArgumentException("Queue name must not be null"); - } + String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes); + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + attributes, + LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false); + ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE); + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); - // Access check + // Access check if (!getSecurityManager().authoriseCreateQueue(autoDelete, durable, - exclusive, + exclusive != null && exclusive != ExclusivityPolicy.NONE, null, null, queueName, @@ -573,22 +580,27 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName)); } - if(id == null) + if(!attributes.containsKey(Queue.ID)) { - id = UUIDGenerator.generateExchangeUUID(queueName, getName()); + UUID id = UUIDGenerator.generateExchangeUUID(queueName, getName()); while(_queueRegistry.getQueue(id) != null) { id = UUID.randomUUID(); } + attributes.put(Queue.ID, id); } - else if(_queueRegistry.getQueue(id) != null) + else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null) { - throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName)); + throw new QueueExistsException("Queue with id " + + MapValueConverter.getUUIDAttribute(Queue.ID, + attributes) + + " already exists", _queueRegistry.getQueue(queueName)); } - return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, - arguments); + + + return _queueFactory.createQueue(creatingSession, attributes); } } @@ -980,13 +992,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg // house keeping task from running. } } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + for (AMQConnectionModel<?,?> connection : getConnectionRegistry().getConnections()) { if (_logger.isDebugEnabled()) { _logger.debug("Checking for long running open transactions on connection " + connection); } - for (AMQSessionModel session : connection.getSessionModels()) + for (AMQSessionModel<?,?> session : connection.getSessionModels()) { if (_logger.isDebugEnabled()) { @@ -1046,5 +1058,54 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { return _model; } + + } + + @Override + public long getDefaultAlertThresholdMessageAge() + { + return getConfiguration().getMaximumMessageAge(); + } + + @Override + public long getDefaultAlertThresholdMessageSize() + { + return getConfiguration().getMaximumMessageSize(); + } + + @Override + public long getDefaultAlertThresholdQueueDepthMessages() + { + return getConfiguration().getMaximumMessageCount(); + } + + @Override + public long getDefaultAlertThresholdQueueDepthBytes() + { + return getConfiguration().getMaximumQueueDepth(); + } + + @Override + public long getDefaultAlertRepeatGap() + { + return getConfiguration().getMinimumAlertRepeatGap(); + } + + @Override + public long getDefaultQueueFlowControlSizeBytes() + { + return getConfiguration().getCapacity(); + } + + @Override + public long getDefaultQueueFlowResumeSizeBytes() + { + return getConfiguration().getFlowResumeCapacity(); + } + + @Override + public int getDefaultMaximumDeliveryAttempts() + { + return getConfiguration().getMaxDeliveryCount(); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 12f8c7dae8..88caf73032 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -42,6 +42,7 @@ import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; public class DefaultUpgraderProvider implements UpgraderProvider { + public static final String EXCLUSIVE = "exclusive"; private final ExchangeRegistry _exchangeRegistry; private final VirtualHost _virtualHost; @@ -63,7 +64,8 @@ public class DefaultUpgraderProvider implements UpgraderProvider currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader()); case 2: currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader()); - + case 3: + currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader()); case CURRENT_CONFIG_VERSION: currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); break; @@ -263,4 +265,49 @@ public class DefaultUpgraderProvider implements UpgraderProvider } } + /* + * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum + * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER" + * ensure OWNER is null unless the exclusivity policy is CONTAINER + */ + private class Version3Upgrader extends NonNullUpgrader + { + + @Override + public void configuredObject(UUID id, String type, Map<String, Object> attributes) + { + if(Queue.class.getSimpleName().equals(type)) + { + Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(attributes); + if(attributes.get(EXCLUSIVE) instanceof Boolean) + { + boolean isExclusive = (Boolean) attributes.get(EXCLUSIVE); + newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); + if(!isExclusive && attributes.containsKey("owner")) + { + newAttributes.remove("owner"); + } + } + else + { + newAttributes.remove("owner"); + } + if(!attributes.containsKey("durable")) + { + newAttributes.put("durable","true"); + } + attributes = newAttributes; + getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes)); + } + + getNextUpgrader().configuredObject(id,type,attributes); + } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java index 59ff1ce6a1..c687cbda92 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -69,7 +69,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE); String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY); boolean autoDelete = lifeTimePolicy == null - || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; + || LifetimePolicy.valueOf(lifeTimePolicy) != LifetimePolicy.PERMANENT; try { _exchange = _exchangeRegistry.getExchange(id); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index fd836fdd98..621ea02059 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -104,13 +104,6 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ public AMQQueue resolve() { String queueName = (String) _attributes.get(Queue.NAME); - String owner = (String) _attributes.get(Queue.OWNER); - boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE); - - Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes); - queueArgumentsMap.remove(Queue.NAME); - queueArgumentsMap.remove(Queue.OWNER); - queueArgumentsMap.remove(Queue.EXCLUSIVE); try { @@ -122,8 +115,9 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ if (_queue == null) { - _queue = _queueFactory.restoreQueue(_id, queueName, owner, false, exclusive, - false, queueArgumentsMap); + Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes); + attributes.put(Queue.ID, _id); + _queue = _queueFactory.restoreQueue(attributes); } } catch (QpidSecurityException e) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 7cd9045af8..9996684bad 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.QpidSecurityException; @@ -59,14 +60,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable int removeQueue(AMQQueue queue) throws QpidSecurityException; - AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException; + AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException; Exchange createExchange(UUID id, @@ -130,4 +124,20 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable public void block(); public void unblock(); + + long getDefaultAlertThresholdMessageAge(); + + long getDefaultAlertThresholdMessageSize(); + + long getDefaultAlertThresholdQueueDepthMessages(); + + long getDefaultAlertThresholdQueueDepthBytes(); + + long getDefaultAlertRepeatGap(); + + long getDefaultQueueFlowControlSizeBytes(); + + long getDefaultQueueFlowResumeSizeBytes(); + + int getDefaultMaximumDeliveryAttempts(); } |