summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java56
1 files changed, 29 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 78f6d38d93..f3ddf32e5a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.MapValueConverter;
final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener
{
@@ -50,15 +51,16 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>();
static
{
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, "x-qpid-minimum-alert-repeat-gap");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, "x-qpid-maximum-message-age");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, "x-qpid-maximum-message-size");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, "x-qpid-maximum-message-count");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, "x-qpid-maximum-delivery-count");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, "x-qpid-capacity");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, "x-qpid-flow-resume-capacity");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY);
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY);
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY);
@@ -78,7 +80,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue)
{
- super(queue.getId());
+ super(queue.getId(), virtualHostAdapter.getTaskExecutor());
_vhost = virtualHostAdapter;
addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
@@ -205,47 +207,47 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
@Override
- public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
try
{
if(ALERT_REPEAT_GAP.equals(name))
{
_queue.setMinimumAlertRepeatGap((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
{
_queue.setMaximumMessageAge((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
{
_queue.setMaximumMessageSize((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
{
_queue.setMaximumQueueDepth((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
{
_queue.setMaximumMessageCount((Long)desired);
- return desired;
+ return true;
}
else if(ALTERNATE_EXCHANGE.equals(name))
{
// In future we may want to accept a UUID as an alternative way to identifying the exchange
ExchangeAdapter alternateExchange = (ExchangeAdapter) desired;
_queue.setAlternateExchange(alternateExchange == null ? null : alternateExchange.getExchange());
- return desired;
+ return true;
}
else if(EXCLUSIVE.equals(name))
{
Boolean exclusiveFlag = (Boolean) desired;
_queue.setExclusive(exclusiveFlag);
- return desired;
+ return true;
}
else if(MESSAGE_GROUP_KEY.equals(name))
{
@@ -266,7 +268,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
{
_queue.setMaximumDeliveryCount((Integer)desired);
- return desired;
+ return true;
}
else if(NO_LOCAL.equals(name))
{
@@ -279,12 +281,12 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
{
_queue.setCapacity((Long)desired);
- return desired;
+ return true;
}
else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
{
_queue.setFlowResumeCapacity((Long)desired);
- return desired;
+ return true;
}
else if(QUEUE_FLOW_STOPPED.equals(name))
{
@@ -301,10 +303,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
else if (DESCRIPTION.equals(name))
{
_queue.setDescription((String) desired);
- return desired;
+ return true;
}
- return super.setAttribute(name, expected, desired);
+ return super.changeAttribute(name, expected, desired);
}
finally
{
@@ -495,8 +497,8 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
throws AccessControlException, IllegalStateException
{
attributes = new HashMap<String, Object>(attributes);
- String bindingKey = getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
- Map<String, Object> bindingArgs = getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.EMPTY_MAP);
+ String bindingKey = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
+ Map<String, Object> bindingArgs = MapValueConverter.getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.<String,Object>emptyMap());
attributes.remove(org.apache.qpid.server.model.Binding.NAME);
attributes.remove(org.apache.qpid.server.model.Binding.ARGUMENTS);
@@ -508,7 +510,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == org.apache.qpid.server.model.Binding.class)
{
@@ -712,15 +714,15 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
@Override
- public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException,
+ protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException,
AccessControlException
{
if (desiredState == State.DELETED)
{
delete();
- return State.DELETED;
+ return true;
}
- return super.setDesiredState(currentState, desiredState);
+ return false;
}
}