diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java | 56 |
1 files changed, 29 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 78f6d38d93..f3ddf32e5a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.MapValueConverter; final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener { @@ -50,15 +51,16 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>(); static { - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, "x-qpid-minimum-alert-repeat-gap"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, "x-qpid-maximum-message-age"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, "x-qpid-maximum-message-size"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, "x-qpid-maximum-message-count"); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, "x-qpid-maximum-delivery-count"); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, "x-qpid-capacity"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, "x-qpid-flow-resume-capacity"); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY); QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY); QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY); @@ -78,7 +80,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue) { - super(queue.getId()); + super(queue.getId(), virtualHostAdapter.getTaskExecutor()); _vhost = virtualHostAdapter; addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter); @@ -205,47 +207,47 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } @Override - public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException + public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { try { if(ALERT_REPEAT_GAP.equals(name)) { _queue.setMinimumAlertRepeatGap((Long)desired); - return desired; + return true; } else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name)) { _queue.setMaximumMessageAge((Long)desired); - return desired; + return true; } else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name)) { _queue.setMaximumMessageSize((Long)desired); - return desired; + return true; } else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name)) { _queue.setMaximumQueueDepth((Long)desired); - return desired; + return true; } else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name)) { _queue.setMaximumMessageCount((Long)desired); - return desired; + return true; } else if(ALTERNATE_EXCHANGE.equals(name)) { // In future we may want to accept a UUID as an alternative way to identifying the exchange ExchangeAdapter alternateExchange = (ExchangeAdapter) desired; _queue.setAlternateExchange(alternateExchange == null ? null : alternateExchange.getExchange()); - return desired; + return true; } else if(EXCLUSIVE.equals(name)) { Boolean exclusiveFlag = (Boolean) desired; _queue.setExclusive(exclusiveFlag); - return desired; + return true; } else if(MESSAGE_GROUP_KEY.equals(name)) { @@ -266,7 +268,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name)) { _queue.setMaximumDeliveryCount((Integer)desired); - return desired; + return true; } else if(NO_LOCAL.equals(name)) { @@ -279,12 +281,12 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name)) { _queue.setCapacity((Long)desired); - return desired; + return true; } else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name)) { _queue.setFlowResumeCapacity((Long)desired); - return desired; + return true; } else if(QUEUE_FLOW_STOPPED.equals(name)) { @@ -301,10 +303,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs else if (DESCRIPTION.equals(name)) { _queue.setDescription((String) desired); - return desired; + return true; } - return super.setAttribute(name, expected, desired); + return super.changeAttribute(name, expected, desired); } finally { @@ -495,8 +497,8 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs throws AccessControlException, IllegalStateException { attributes = new HashMap<String, Object>(attributes); - String bindingKey = getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, ""); - Map<String, Object> bindingArgs = getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.EMPTY_MAP); + String bindingKey = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, ""); + Map<String, Object> bindingArgs = MapValueConverter.getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.<String,Object>emptyMap()); attributes.remove(org.apache.qpid.server.model.Binding.NAME); attributes.remove(org.apache.qpid.server.model.Binding.ARGUMENTS); @@ -508,7 +510,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == org.apache.qpid.server.model.Binding.class) { @@ -712,15 +714,15 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } @Override - public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException, + protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException, AccessControlException { if (desiredState == State.DELETED) { delete(); - return State.DELETED; + return true; } - return super.setDesiredState(currentState, desiredState); + return false; } } |