summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java816
1 files changed, 816 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
new file mode 100644
index 0000000000..074f7c243b
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -0,0 +1,816 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.lang.reflect.Type;
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFinder;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.MapValueConverter;
+
+final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener
+{
+ @SuppressWarnings("serial")
+ static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
+ put(ALERT_REPEAT_GAP, Long.class);
+ put(ALERT_THRESHOLD_MESSAGE_AGE, Long.class);
+ put(ALERT_THRESHOLD_MESSAGE_SIZE, Long.class);
+ put(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Long.class);
+ put(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Long.class);
+ put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class);
+ put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class);
+ put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
+ put(EXCLUSIVE, Boolean.class);
+ put(DESCRIPTION, String.class);
+ }});
+
+ private final AMQQueue _queue;
+ private final Map<Binding, BindingAdapter> _bindingAdapters =
+ new HashMap<Binding, BindingAdapter>();
+ private Map<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter> _consumerAdapters =
+ new HashMap<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter>();
+
+
+ private final VirtualHostAdapter _vhost;
+ private QueueStatisticsAdapter _statistics;
+ private QueueNotificationListener _queueNotificationListener;
+
+ public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue)
+ {
+ super(queue.getId(), virtualHostAdapter.getTaskExecutor());
+ _vhost = virtualHostAdapter;
+ addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
+
+ _queue = queue;
+ _queue.addSubscriptionRegistrationListener(this);
+ populateConsumers();
+ _statistics = new QueueStatisticsAdapter(queue);
+ _queue.setNotificationListener(this);
+ }
+
+ /**
+ * Helper method to retrieve the SessionAdapter keyed by the AMQSessionModel.
+ * This method first finds the ConnectionAdapter associated with the Session from this QueueAdapter's parent vhost
+ * then it does a lookup on that to find the SessionAdapter keyed by the requested AMQSessionModel instance.
+ * @param session the AMQSessionModel used to index the SessionAdapter.
+ * @return the requested SessionAdapter or null if it can't be found.
+ */
+ private SessionAdapter getSessionAdapter(AMQSessionModel session)
+ {
+ // Retrieve the ConnectionModel associated with the SessionModel as a key to lookup the ConnectionAdapter.
+ AMQConnectionModel connectionKey = session.getConnectionModel();
+
+ // Lookup the ConnectionAdapter, from which we should be able to retrieve the SessionAdapter we really want.
+ ConnectionAdapter connectionAdapter = _vhost.getConnectionAdapter(connectionKey);
+ if (connectionAdapter == null)
+ {
+ return null; // If we can't find an associated ConnectionAdapter the SessionAdapter is a lost cause.
+ }
+ else
+ { // With a good ConnectionAdapter we can finally try to find the SessionAdapter we are actually looking for.
+ SessionAdapter sessionAdapter = connectionAdapter.getSessionAdapter(session);
+ if (sessionAdapter == null)
+ {
+ return null; // If the SessionAdapter isn't associated with the selected ConnectionAdapter give up.
+ }
+ else
+ {
+ return sessionAdapter;
+ }
+ }
+ }
+
+ private void populateConsumers()
+ {
+ Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers();
+
+ synchronized (_consumerAdapters)
+ {
+ Iterator<org.apache.qpid.server.subscription.Subscription> iter = _consumerAdapters.keySet().iterator();
+ for(org.apache.qpid.server.subscription.Subscription subscription : actualSubscriptions)
+ {
+ if(!_consumerAdapters.containsKey(subscription))
+ {
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
+ _consumerAdapters.put(subscription, adapter);
+ if (sessionAdapter != null)
+ { // Register ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionRegistered(subscription, adapter);
+ }
+ }
+ }
+ }
+ }
+
+ public Collection<org.apache.qpid.server.model.Binding> getBindings()
+ {
+ synchronized (_bindingAdapters)
+ {
+ return new ArrayList<org.apache.qpid.server.model.Binding>(_bindingAdapters.values());
+ }
+ }
+
+ public Collection<Consumer> getConsumers()
+ {
+ synchronized (_consumerAdapters)
+ {
+ return new ArrayList<Consumer>(_consumerAdapters.values());
+ }
+
+ }
+
+ public void visit(final QueueEntryVisitor visitor)
+ {
+ _queue.visit(visitor);
+ }
+
+ public void delete()
+ {
+ try
+ {
+ _queue.getVirtualHost().removeQueue(_queue);
+ }
+ catch(AMQException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public String getName()
+ {
+ return _queue.getName();
+ }
+
+ public String setName(final String currentName, final String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ return null; //TODO
+ }
+
+ public State getActualState()
+ {
+ return null; //TODO
+ }
+
+ public boolean isDurable()
+ {
+ return _queue.isDurable();
+ }
+
+ public void setDurable(final boolean durable)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ //TODO
+ }
+
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+ }
+
+ public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ return null; //TODO
+ }
+
+ public long getTimeToLive()
+ {
+ return 0; //TODO
+ }
+
+ public long setTimeToLive(final long expected, final long desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ return 0; //TODO
+ }
+
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return Queue.AVAILABLE_ATTRIBUTES;
+ }
+
+ @Override
+ public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ try
+ {
+ if(ALERT_REPEAT_GAP.equals(name))
+ {
+ _queue.setMinimumAlertRepeatGap((Long)desired);
+ return true;
+ }
+ else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
+ {
+ _queue.setMaximumMessageAge((Long)desired);
+ return true;
+ }
+ else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
+ {
+ _queue.setMaximumMessageSize((Long)desired);
+ return true;
+ }
+ else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
+ {
+ _queue.setMaximumQueueDepth((Long)desired);
+ return true;
+ }
+ else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
+ {
+ _queue.setMaximumMessageCount((Long)desired);
+ return true;
+ }
+ else if(ALTERNATE_EXCHANGE.equals(name))
+ {
+ // In future we may want to accept a UUID as an alternative way to identifying the exchange
+ ExchangeAdapter alternateExchange = (ExchangeAdapter) desired;
+ _queue.setAlternateExchange(alternateExchange == null ? null : alternateExchange.getExchange());
+ return true;
+ }
+ else if(EXCLUSIVE.equals(name))
+ {
+ Boolean exclusiveFlag = (Boolean) desired;
+ _queue.setExclusive(exclusiveFlag);
+ return true;
+ }
+ else if(MESSAGE_GROUP_KEY.equals(name))
+ {
+ // TODO
+ }
+ else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
+ {
+ // TODO
+ }
+ else if(LVQ_KEY.equals(name))
+ {
+ // TODO
+ }
+ else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
+ {
+ _queue.setMaximumDeliveryCount((Integer)desired);
+ return true;
+ }
+ else if(NO_LOCAL.equals(name))
+ {
+ // TODO
+ }
+ else if(OWNER.equals(name))
+ {
+ // TODO
+ }
+ else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
+ {
+ _queue.setCapacity((Long)desired);
+ return true;
+ }
+ else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
+ {
+ _queue.setFlowResumeCapacity((Long)desired);
+ return true;
+ }
+ else if(QUEUE_FLOW_STOPPED.equals(name))
+ {
+ // TODO
+ }
+ else if(SORT_KEY.equals(name))
+ {
+ // TODO
+ }
+ else if(TYPE.equals(name))
+ {
+ // TODO
+ }
+ else if (DESCRIPTION.equals(name))
+ {
+ _queue.setDescription((String) desired);
+ return true;
+ }
+
+ return super.changeAttribute(name, expected, desired);
+ }
+ finally
+ {
+ if (_queue.isDurable())
+ {
+ try
+ {
+ DurableConfigurationStoreHelper.updateQueue(_queue.getVirtualHost().getDurableConfigurationStore(),
+ _queue);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Object getAttribute(String name)
+ {
+
+ if(ALERT_REPEAT_GAP.equals(name))
+ {
+ return _queue.getMinimumAlertRepeatGap();
+ }
+ else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
+ {
+ return _queue.getMaximumMessageAge();
+ }
+ else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
+ {
+ return _queue.getMaximumMessageSize();
+ }
+ else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
+ {
+ return _queue.getMaximumQueueDepth();
+ }
+ else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
+ {
+ return _queue.getMaximumMessageCount();
+ }
+ else if(ALTERNATE_EXCHANGE.equals(name))
+ {
+ org.apache.qpid.server.exchange.Exchange alternateExchange = _queue.getAlternateExchange();
+ return alternateExchange == null ? null :
+ ConfiguredObjectFinder.findConfiguredObjectByName(_vhost.getExchanges(),
+ alternateExchange.getName());
+ }
+ else if(EXCLUSIVE.equals(name))
+ {
+ return _queue.isExclusive();
+ }
+ else if(MESSAGE_GROUP_KEY.equals(name))
+ {
+ return _queue.getAttribute(MESSAGE_GROUP_KEY);
+ }
+ else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
+ {
+ //We only return the boolean value if message groups are actually in use
+ return getAttribute(MESSAGE_GROUP_KEY) == null ? null : _queue.getAttribute(MESSAGE_GROUP_SHARED_GROUPS);
+ }
+ else if(LVQ_KEY.equals(name))
+ {
+ if(_queue instanceof ConflationQueue)
+ {
+ return ((ConflationQueue)_queue).getConflationKey();
+ }
+ }
+ else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
+ {
+ return _queue.getMaximumDeliveryCount();
+ }
+ else if(NO_LOCAL.equals(name))
+ {
+ // TODO
+ }
+ else if(OWNER.equals(name))
+ {
+ return _queue.getOwner();
+ }
+ else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
+ {
+ return _queue.getCapacity();
+ }
+ else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
+ {
+ return _queue.getFlowResumeCapacity();
+ }
+ else if(QUEUE_FLOW_STOPPED.equals(name))
+ {
+ return _queue.isOverfull();
+ }
+ else if(SORT_KEY.equals(name))
+ {
+ if(_queue instanceof SortedQueue)
+ {
+ return ((SortedQueue)_queue).getSortedPropertyName();
+ }
+ }
+ else if(TYPE.equals(name))
+ {
+ if(_queue instanceof SortedQueue)
+ {
+ return "sorted";
+ }
+ if(_queue instanceof ConflationQueue)
+ {
+ return "lvq";
+ }
+ if(_queue instanceof AMQPriorityQueue)
+ {
+ return "priority";
+ }
+ return "standard";
+ }
+ else if(CREATED.equals(name))
+ {
+ // TODO
+ }
+ else if(DURABLE.equals(name))
+ {
+ return _queue.isDurable();
+ }
+ else if(ID.equals(name))
+ {
+ return getId();
+ }
+ else if(LIFETIME_POLICY.equals(name))
+ {
+ return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+ }
+ else if(NAME.equals(name))
+ {
+ return _queue.getName();
+ }
+ else if(STATE.equals(name))
+ {
+ return State.ACTIVE; // TODO
+ }
+ else if(TIME_TO_LIVE.equals(name))
+ {
+ // TODO
+ }
+ else if(UPDATED.equals(name))
+ {
+ // TODO
+ }
+ else if (DESCRIPTION.equals(name))
+ {
+ return _queue.getDescription();
+ }
+ else if(PRIORITIES.equals(name))
+ {
+ if(_queue instanceof AMQPriorityQueue)
+ {
+ return ((AMQPriorityQueue)_queue).getPriorities();
+ }
+ }
+ return super.getAttribute(name);
+ }
+
+ public Statistics getStatistics()
+ {
+ return _statistics;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
+ {
+ if(clazz == Consumer.class)
+ {
+ return (Collection<C>) getConsumers();
+ }
+ else if(clazz == org.apache.qpid.server.model.Binding.class)
+ {
+ return (Collection<C>) getBindings();
+ }
+ else
+ {
+ return Collections.emptySet();
+ }
+ }
+
+ public org.apache.qpid.server.model.Binding createBinding(Exchange exchange, Map<String, Object> attributes)
+ throws AccessControlException, IllegalStateException
+ {
+ attributes = new HashMap<String, Object>(attributes);
+ String bindingKey = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
+ Map<String, Object> bindingArgs = MapValueConverter.getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.<String,Object>emptyMap());
+
+ attributes.remove(org.apache.qpid.server.model.Binding.NAME);
+ attributes.remove(org.apache.qpid.server.model.Binding.ARGUMENTS);
+
+ return exchange.createBinding(bindingKey, this, bindingArgs, attributes);
+
+ }
+
+
+
+ @Override
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ {
+ if(childClass == org.apache.qpid.server.model.Binding.class)
+ {
+ if(otherParents != null && otherParents.length == 1 && otherParents[0] instanceof Exchange)
+ {
+ Exchange exchange = (Exchange) otherParents[0];
+ if(exchange.getParent(org.apache.qpid.server.model.VirtualHost.class) == getParent(org.apache.qpid.server.model.VirtualHost.class))
+ {
+ return (C) createBinding(exchange, attributes);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Queue and Exchange parents of a binding must be on same virtual host");
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Other parent must be an exchange");
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ void bindingRegistered(Binding binding, BindingAdapter adapter)
+ {
+ synchronized (_bindingAdapters)
+ {
+ _bindingAdapters.put(binding, adapter);
+ }
+ childAdded(adapter);
+ }
+
+ void bindingUnregistered(Binding binding)
+ {
+ BindingAdapter adapter = null;
+ synchronized (_bindingAdapters)
+ {
+ adapter = _bindingAdapters.remove(binding);
+ }
+ if(adapter != null)
+ {
+ childRemoved(adapter);
+ }
+ }
+
+ AMQQueue getAMQQueue()
+ {
+ return _queue;
+ }
+
+ public void subscriptionRegistered(final AMQQueue queue, final Subscription subscription)
+ {
+ ConsumerAdapter adapter = null;
+ synchronized (_consumerAdapters)
+ {
+ if(!_consumerAdapters.containsKey(subscription))
+ {
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
+ _consumerAdapters.put(subscription, adapter);
+ if (sessionAdapter != null)
+ { // Register ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionRegistered(subscription, adapter);
+ }
+ }
+ }
+ if(adapter != null)
+ {
+ childAdded(adapter);
+ }
+ }
+
+ public void subscriptionUnregistered(final AMQQueue queue, final Subscription subscription)
+ {
+ ConsumerAdapter adapter = null;
+
+ synchronized (_consumerAdapters)
+ {
+ adapter = _consumerAdapters.remove(subscription);
+ }
+ if(adapter != null)
+ {
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ if (sessionAdapter != null)
+ { // Unregister ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionUnregistered(subscription);
+ }
+ childRemoved(adapter);
+ }
+ }
+
+ VirtualHostAdapter getVirtualHost()
+ {
+ return _vhost;
+ }
+
+
+ private static class QueueStatisticsAdapter implements Statistics
+ {
+
+ private final AMQQueue _queue;
+
+ public QueueStatisticsAdapter(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public Collection<String> getStatisticNames()
+ {
+ return Queue.AVAILABLE_STATISTICS;
+ }
+
+ public Object getStatistic(String name)
+ {
+ if(BINDING_COUNT.equals(name))
+ {
+ return _queue.getBindingCount();
+ }
+ else if(CONSUMER_COUNT.equals(name))
+ {
+ return _queue.getConsumerCount();
+ }
+ else if(CONSUMER_COUNT_WITH_CREDIT.equals(name))
+ {
+ return _queue.getActiveConsumerCount();
+ }
+ else if(DISCARDS_TTL_BYTES.equals(name))
+ {
+ return null; // TODO
+ }
+ else if(DISCARDS_TTL_MESSAGES.equals(name))
+ {
+ return null; // TODO
+ }
+ else if(PERSISTENT_DEQUEUED_BYTES.equals(name))
+ {
+ return _queue.getPersistentByteDequeues();
+ }
+ else if(PERSISTENT_DEQUEUED_MESSAGES.equals(name))
+ {
+ return _queue.getPersistentMsgDequeues();
+ }
+ else if(PERSISTENT_ENQUEUED_BYTES.equals(name))
+ {
+ return _queue.getPersistentByteEnqueues();
+ }
+ else if(PERSISTENT_ENQUEUED_MESSAGES.equals(name))
+ {
+ return _queue.getPersistentMsgEnqueues();
+ }
+ else if(QUEUE_DEPTH_BYTES.equals(name))
+ {
+ return _queue.getQueueDepth();
+ }
+ else if(QUEUE_DEPTH_MESSAGES.equals(name))
+ {
+ return _queue.getMessageCount();
+ }
+ else if(STATE_CHANGED.equals(name))
+ {
+ return null; // TODO
+ }
+ else if(TOTAL_DEQUEUED_BYTES.equals(name))
+ {
+ return _queue.getTotalDequeueSize();
+ }
+ else if(TOTAL_DEQUEUED_MESSAGES.equals(name))
+ {
+ return _queue.getTotalDequeueCount();
+ }
+ else if(TOTAL_ENQUEUED_BYTES.equals(name))
+ {
+ return _queue.getTotalEnqueueSize();
+ }
+ else if(TOTAL_ENQUEUED_MESSAGES.equals(name))
+ {
+ return _queue.getTotalEnqueueCount();
+ }
+ else if(UNACKNOWLEDGED_BYTES.equals(name))
+ {
+ return _queue.getUnackedMessageBytes();
+ }
+ else if(UNACKNOWLEDGED_MESSAGES.equals(name))
+ {
+ return _queue.getUnackedMessageCount();
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ public void setNotificationListener(QueueNotificationListener listener)
+ {
+ _queueNotificationListener = listener;
+ }
+
+ @Override
+ public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
+ {
+ QueueNotificationListener listener = _queueNotificationListener;
+ if(listener != null)
+ {
+ listener.notifyClients(notification, this, notificationMsg);
+ }
+ }
+
+ @Override
+ protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException,
+ AccessControlException
+ {
+ if (desiredState == State.DELETED)
+ {
+ delete();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected void authoriseSetAttribute(String name, Object expected, Object desired) throws AccessControlException
+ {
+ if (!_vhost.getSecurityManager().authoriseUpdate(_queue))
+ {
+ throw new AccessControlException("Setting of queue attribute is denied");
+ }
+ }
+
+ @Override
+ protected void authoriseSetAttributes(Map<String, Object> attributes) throws AccessControlException
+ {
+ if (!_vhost.getSecurityManager().authoriseUpdate(_queue))
+ {
+ throw new AccessControlException("Setting of queue attributes is denied");
+ }
+ }
+
+ @Override
+ protected void changeAttributes(final Map<String, Object> attributes)
+ {
+ Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
+ validateAttributes(convertedAttributes);
+
+ super.changeAttributes(convertedAttributes);
+ }
+
+ private void validateAttributes(Map<String, Object> convertedAttributes)
+ {
+ Long queueFlowControlSize = (Long) convertedAttributes.get(QUEUE_FLOW_CONTROL_SIZE_BYTES);
+ Long queueFlowControlResumeSize = (Long) convertedAttributes.get(QUEUE_FLOW_RESUME_SIZE_BYTES);
+ if (queueFlowControlSize != null || queueFlowControlResumeSize != null )
+ {
+ if (queueFlowControlSize == null)
+ {
+ queueFlowControlSize = (Long)getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES);
+ }
+ if (queueFlowControlResumeSize == null)
+ {
+ queueFlowControlResumeSize = (Long)getAttribute(QUEUE_FLOW_RESUME_SIZE_BYTES);
+ }
+ if (queueFlowControlResumeSize > queueFlowControlSize)
+ {
+ throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
+ }
+ }
+ for (Map.Entry<String, Object> entry: convertedAttributes.entrySet())
+ {
+ Object value = entry.getValue();
+ if (value instanceof Number && ((Number)value).longValue() < 0)
+ {
+ throw new IllegalConfigurationException("Only positive integer value can be specified for the attribute "
+ + entry.getKey());
+ }
+ }
+ }
+
+}