summaryrefslogtreecommitdiff
path: root/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/virtualhost')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java109
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java49
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java12
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java26
5 files changed, 155 insertions, 43 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index aa70bb3e8d..0e9b879316 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -34,6 +35,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -69,7 +73,9 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
@@ -529,7 +535,10 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
int purged = queue.delete();
getQueueRegistry().unregisterQueue(queue.getName());
- if (queue.isDurable() && !queue.isAutoDelete())
+ if (queue.isDurable() && !(queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_SESSION_END))
{
DurableConfigurationStore store = getDurableConfigurationStore();
DurableConfigurationStoreHelper.removeQueue(store, queue);
@@ -538,26 +547,24 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
}
- @Override
- public AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments) throws QpidSecurityException, QueueExistsException
+ public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes) throws QpidSecurityException, QueueExistsException
{
+ // make a copy as we may augment (with an ID for example)
+ attributes = new LinkedHashMap<String, Object>(attributes);
- if (queueName == null)
- {
- throw new IllegalArgumentException("Queue name must not be null");
- }
+ String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+ boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+ Queue.LIFETIME_POLICY,
+ attributes,
+ LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+ boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
+ ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE);
+ String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
- // Access check
+ // Access check
if (!getSecurityManager().authoriseCreateQueue(autoDelete,
durable,
- exclusive,
+ exclusive != null && exclusive != ExclusivityPolicy.NONE,
null,
null,
queueName,
@@ -573,22 +580,27 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
{
throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
}
- if(id == null)
+ if(!attributes.containsKey(Queue.ID))
{
- id = UUIDGenerator.generateExchangeUUID(queueName, getName());
+ UUID id = UUIDGenerator.generateExchangeUUID(queueName, getName());
while(_queueRegistry.getQueue(id) != null)
{
id = UUID.randomUUID();
}
+ attributes.put(Queue.ID, id);
}
- else if(_queueRegistry.getQueue(id) != null)
+ else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null)
{
- throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName));
+ throw new QueueExistsException("Queue with id "
+ + MapValueConverter.getUUIDAttribute(Queue.ID,
+ attributes)
+ + " already exists", _queueRegistry.getQueue(queueName));
}
- return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer,
- arguments);
+
+
+ return _queueFactory.createQueue(creatingSession, attributes);
}
}
@@ -980,13 +992,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
// house keeping task from running.
}
}
- for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+ for (AMQConnectionModel<?,?> connection : getConnectionRegistry().getConnections())
{
if (_logger.isDebugEnabled())
{
_logger.debug("Checking for long running open transactions on connection " + connection);
}
- for (AMQSessionModel session : connection.getSessionModels())
+ for (AMQSessionModel<?,?> session : connection.getSessionModels())
{
if (_logger.isDebugEnabled())
{
@@ -1046,5 +1058,54 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
{
return _model;
}
+
+ }
+
+ @Override
+ public long getDefaultAlertThresholdMessageAge()
+ {
+ return getConfiguration().getMaximumMessageAge();
+ }
+
+ @Override
+ public long getDefaultAlertThresholdMessageSize()
+ {
+ return getConfiguration().getMaximumMessageSize();
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthMessages()
+ {
+ return getConfiguration().getMaximumMessageCount();
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthBytes()
+ {
+ return getConfiguration().getMaximumQueueDepth();
+ }
+
+ @Override
+ public long getDefaultAlertRepeatGap()
+ {
+ return getConfiguration().getMinimumAlertRepeatGap();
+ }
+
+ @Override
+ public long getDefaultQueueFlowControlSizeBytes()
+ {
+ return getConfiguration().getCapacity();
+ }
+
+ @Override
+ public long getDefaultQueueFlowResumeSizeBytes()
+ {
+ return getConfiguration().getFlowResumeCapacity();
+ }
+
+ @Override
+ public int getDefaultMaximumDeliveryAttempts()
+ {
+ return getConfiguration().getMaxDeliveryCount();
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index 12f8c7dae8..88caf73032 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -42,6 +42,7 @@ import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
public class DefaultUpgraderProvider implements UpgraderProvider
{
+ public static final String EXCLUSIVE = "exclusive";
private final ExchangeRegistry _exchangeRegistry;
private final VirtualHost _virtualHost;
@@ -63,7 +64,8 @@ public class DefaultUpgraderProvider implements UpgraderProvider
currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader());
case 2:
currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
-
+ case 3:
+ currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader());
case CURRENT_CONFIG_VERSION:
currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
break;
@@ -263,4 +265,49 @@ public class DefaultUpgraderProvider implements UpgraderProvider
}
}
+ /*
+ * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum
+ * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
+ * ensure OWNER is null unless the exclusivity policy is CONTAINER
+ */
+ private class Version3Upgrader extends NonNullUpgrader
+ {
+
+ @Override
+ public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+ {
+ if(Queue.class.getSimpleName().equals(type))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(attributes);
+ if(attributes.get(EXCLUSIVE) instanceof Boolean)
+ {
+ boolean isExclusive = (Boolean) attributes.get(EXCLUSIVE);
+ newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
+ if(!isExclusive && attributes.containsKey("owner"))
+ {
+ newAttributes.remove("owner");
+ }
+ }
+ else
+ {
+ newAttributes.remove("owner");
+ }
+ if(!attributes.containsKey("durable"))
+ {
+ newAttributes.put("durable","true");
+ }
+ attributes = newAttributes;
+ getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes));
+ }
+
+ getNextUpgrader().configuredObject(id,type,attributes);
+ }
+
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
+ }
+ }
+
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
index 59ff1ce6a1..c687cbda92 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
@@ -69,7 +69,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<
String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
boolean autoDelete = lifeTimePolicy == null
- || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
+ || LifetimePolicy.valueOf(lifeTimePolicy) != LifetimePolicy.PERMANENT;
try
{
_exchange = _exchangeRegistry.getExchange(id);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
index fd836fdd98..621ea02059 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -104,13 +104,6 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
public AMQQueue resolve()
{
String queueName = (String) _attributes.get(Queue.NAME);
- String owner = (String) _attributes.get(Queue.OWNER);
- boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE);
-
- Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes);
- queueArgumentsMap.remove(Queue.NAME);
- queueArgumentsMap.remove(Queue.OWNER);
- queueArgumentsMap.remove(Queue.EXCLUSIVE);
try
{
@@ -122,8 +115,9 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
if (_queue == null)
{
- _queue = _queueFactory.restoreQueue(_id, queueName, owner, false, exclusive,
- false, queueArgumentsMap);
+ Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes);
+ attributes.put(Queue.ID, _id);
+ _queue = _queueFactory.restoreQueue(attributes);
}
}
catch (QpidSecurityException e)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 7cd9045af8..9996684bad 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.QpidSecurityException;
@@ -59,14 +60,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
int removeQueue(AMQQueue queue) throws QpidSecurityException;
- AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException;
+ AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException;
Exchange createExchange(UUID id,
@@ -130,4 +124,20 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
public void block();
public void unblock();
+
+ long getDefaultAlertThresholdMessageAge();
+
+ long getDefaultAlertThresholdMessageSize();
+
+ long getDefaultAlertThresholdQueueDepthMessages();
+
+ long getDefaultAlertThresholdQueueDepthBytes();
+
+ long getDefaultAlertRepeatGap();
+
+ long getDefaultQueueFlowControlSizeBytes();
+
+ long getDefaultQueueFlowResumeSizeBytes();
+
+ int getDefaultMaximumDeliveryAttempts();
}