summaryrefslogtreecommitdiff
path: root/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java109
1 files changed, 85 insertions, 24 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index aa70bb3e8d..0e9b879316 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -34,6 +35,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -69,7 +73,9 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
@@ -529,7 +535,10 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
int purged = queue.delete();
getQueueRegistry().unregisterQueue(queue.getName());
- if (queue.isDurable() && !queue.isAutoDelete())
+ if (queue.isDurable() && !(queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_SESSION_END))
{
DurableConfigurationStore store = getDurableConfigurationStore();
DurableConfigurationStoreHelper.removeQueue(store, queue);
@@ -538,26 +547,24 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
}
- @Override
- public AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments) throws QpidSecurityException, QueueExistsException
+ public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes) throws QpidSecurityException, QueueExistsException
{
+ // make a copy as we may augment (with an ID for example)
+ attributes = new LinkedHashMap<String, Object>(attributes);
- if (queueName == null)
- {
- throw new IllegalArgumentException("Queue name must not be null");
- }
+ String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+ boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+ Queue.LIFETIME_POLICY,
+ attributes,
+ LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+ boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
+ ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE);
+ String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
- // Access check
+ // Access check
if (!getSecurityManager().authoriseCreateQueue(autoDelete,
durable,
- exclusive,
+ exclusive != null && exclusive != ExclusivityPolicy.NONE,
null,
null,
queueName,
@@ -573,22 +580,27 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
{
throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
}
- if(id == null)
+ if(!attributes.containsKey(Queue.ID))
{
- id = UUIDGenerator.generateExchangeUUID(queueName, getName());
+ UUID id = UUIDGenerator.generateExchangeUUID(queueName, getName());
while(_queueRegistry.getQueue(id) != null)
{
id = UUID.randomUUID();
}
+ attributes.put(Queue.ID, id);
}
- else if(_queueRegistry.getQueue(id) != null)
+ else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null)
{
- throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName));
+ throw new QueueExistsException("Queue with id "
+ + MapValueConverter.getUUIDAttribute(Queue.ID,
+ attributes)
+ + " already exists", _queueRegistry.getQueue(queueName));
}
- return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer,
- arguments);
+
+
+ return _queueFactory.createQueue(creatingSession, attributes);
}
}
@@ -980,13 +992,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
// house keeping task from running.
}
}
- for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+ for (AMQConnectionModel<?,?> connection : getConnectionRegistry().getConnections())
{
if (_logger.isDebugEnabled())
{
_logger.debug("Checking for long running open transactions on connection " + connection);
}
- for (AMQSessionModel session : connection.getSessionModels())
+ for (AMQSessionModel<?,?> session : connection.getSessionModels())
{
if (_logger.isDebugEnabled())
{
@@ -1046,5 +1058,54 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
{
return _model;
}
+
+ }
+
+ @Override
+ public long getDefaultAlertThresholdMessageAge()
+ {
+ return getConfiguration().getMaximumMessageAge();
+ }
+
+ @Override
+ public long getDefaultAlertThresholdMessageSize()
+ {
+ return getConfiguration().getMaximumMessageSize();
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthMessages()
+ {
+ return getConfiguration().getMaximumMessageCount();
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthBytes()
+ {
+ return getConfiguration().getMaximumQueueDepth();
+ }
+
+ @Override
+ public long getDefaultAlertRepeatGap()
+ {
+ return getConfiguration().getMinimumAlertRepeatGap();
+ }
+
+ @Override
+ public long getDefaultQueueFlowControlSizeBytes()
+ {
+ return getConfiguration().getCapacity();
+ }
+
+ @Override
+ public long getDefaultQueueFlowResumeSizeBytes()
+ {
+ return getConfiguration().getFlowResumeCapacity();
+ }
+
+ @Override
+ public int getDefaultMaximumDeliveryAttempts()
+ {
+ return getConfiguration().getMaxDeliveryCount();
}
}