diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /java/broker/src/main/java/org/apache/qpid/server/queue | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue')
27 files changed, 0 insertions, 5980 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java deleted file mode 100644 index b6e97e08fb..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ /dev/null @@ -1,90 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.Map; - -public class AMQPriorityQueue extends SimpleAMQQueue -{ - protected AMQPriorityQueue(final AMQShortString name, - final boolean durable, - final AMQShortString owner, - final boolean autoDelete, - boolean exclusive, - final VirtualHost virtualHost, - int priorities, Map<String, Object> arguments) - { - super(name, durable, owner, autoDelete, exclusive, virtualHost,new PriorityQueueList.Factory(priorities), arguments); - } - - public AMQPriorityQueue(String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, VirtualHost virtualHost, int priorities, Map<String,Object> arguments) - { - this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), - autoDelete, exclusive,virtualHost, priorities, arguments); - } - - public int getPriorities() - { - return ((PriorityQueueList) _entries).getPriorities(); - } - - @Override - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) - { - // check that all subscriptions are not in advance of the entry - SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); - while(subIter.advance() && !entry.isAcquired()) - { - final Subscription subscription = subIter.getNode().getSubscription(); - if(!subscription.isClosed()) - { - QueueContext context = (QueueContext) subscription.getQueueContext(); - if(context != null) - { - QueueEntry subnode = context._lastSeenEntry; - QueueEntry released = context._releasedEntry; - while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0)) - { - if(QueueContext._releasedUpdater.compareAndSet(context,released,entry)) - { - break; - } - else - { - subnode = context._lastSeenEntry; - released = context._releasedEntry; - } - } - } - } - - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java deleted file mode 100644 index 9b9de8333b..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.QueueConfig; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeReferrer; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.security.PrincipalHolder; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue, - QueueConfig -{ - boolean getDeleteOnNoConsumers(); - - void setDeleteOnNoConsumers(boolean b); - - void addBinding(Binding binding); - - void removeBinding(Binding binding); - - List<Binding> getBindings(); - - int getBindingCount(); - - LogSubject getLogSubject(); - - public interface Context - { - QueueEntry getLastSeenEntry(); - } - - void setNoLocal(boolean b); - - boolean isAutoDelete(); - - AMQShortString getOwner(); - PrincipalHolder getPrincipalHolder(); - void setPrincipalHolder(PrincipalHolder principalHolder); - - void setExclusiveOwningSession(AMQSessionModel owner); - AMQSessionModel getExclusiveOwningSession(); - - VirtualHost getVirtualHost(); - - void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException; - - void unregisterSubscription(final Subscription subscription) throws AMQException; - - - int getConsumerCount(); - - int getActiveConsumerCount(); - - boolean hasExclusiveSubscriber(); - - boolean isUnused(); - - boolean isEmpty(); - - int getMessageCount(); - - int getUndeliveredMessageCount(); - - - long getQueueDepth(); - - long getReceivedMessageCount(); - - long getOldestMessageArrivalTime(); - - boolean isDeleted(); - - int delete() throws AMQException; - - void requeue(QueueEntry entry); - - void dequeue(QueueEntry entry, Subscription sub); - - void decrementUnackedMsgCount(); - - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - - void addQueueDeleteTask(final Task task); - void removeQueueDeleteTask(final Task task); - - - - List<QueueEntry> getMessagesOnTheQueue(); - - List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId); - - List<Long> getMessagesOnTheQueue(int num); - - List<Long> getMessagesOnTheQueue(int num, int offest); - - QueueEntry getMessageOnTheQueue(long messageId); - - /** - * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. - * - * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. - * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. - * @param fromPosition - * @param toPosition - * @return - */ - public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition); - - - void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, - ServerTransaction transaction); - - void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction transaction); - - void removeMessagesFromQueue(long fromMessageId, long toMessageId); - - - - long getMaximumMessageSize(); - - void setMaximumMessageSize(long value); - - - long getMaximumMessageCount(); - - void setMaximumMessageCount(long value); - - - long getMaximumQueueDepth(); - - void setMaximumQueueDepth(long value); - - - long getMaximumMessageAge(); - - void setMaximumMessageAge(final long maximumMessageAge); - - - long getMinimumAlertRepeatGap(); - - void setMinimumAlertRepeatGap(long value); - - - long getCapacity(); - - void setCapacity(long capacity); - - - long getFlowResumeCapacity(); - - void setFlowResumeCapacity(long flowResumeCapacity); - - boolean isOverfull(); - - void deleteMessageFromTop(); - - long clearQueue() throws AMQException; - - /** - * Checks the status of messages on the queue, purging expired ones, firing age related alerts etc. - * @throws AMQException - */ - void checkMessageStatus() throws AMQException; - - Set<NotificationCheck> getNotificationChecks(); - - void flushSubscription(final Subscription sub) throws AMQException; - - void deliverAsync(final Subscription sub); - - void deliverAsync(); - - void stop(); - - boolean isExclusive(); - - Exchange getAlternateExchange(); - - void setAlternateExchange(Exchange exchange); - - Map<String, Object> getArguments(); - - void checkCapacity(AMQChannel channel); - - /** - * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - static final class ExistingExclusiveSubscription extends AMQException - { - - public ExistingExclusiveSubscription() - { - super(""); - } - } - - /** - * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - static final class ExistingSubscriptionPreventsExclusive extends AMQException - { - public ExistingSubscriptionPreventsExclusive() - { - super(""); - } - } - - static interface Task - { - public void doTask(AMQQueue queue) throws AMQException; - } - - void configure(ConfigurationPlugin config); - - ConfigurationPlugin getConfiguration(); - - ManagedObject getManagedObject(); - - void setExclusive(boolean exclusive) throws AMQException; -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java deleted file mode 100644 index bee55118ba..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQSecurityException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.QueueConfiguration; - -import java.util.Map; -import java.util.HashMap; - -public class AMQQueueFactory -{ - public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); - public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; - public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; - public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; - - private abstract static class QueueProperty - { - - private final AMQShortString _argumentName; - - - public QueueProperty(String argumentName) - { - _argumentName = new AMQShortString(argumentName); - } - - public AMQShortString getArgumentName() - { - return _argumentName; - } - - - public abstract void setPropertyValue(AMQQueue queue, Object value); - - } - - private abstract static class QueueLongProperty extends QueueProperty - { - - public QueueLongProperty(String argumentName) - { - super(argumentName); - } - - public void setPropertyValue(AMQQueue queue, Object value) - { - if(value instanceof Number) - { - setPropertyValue(queue, ((Number)value).longValue()); - } - - } - - abstract void setPropertyValue(AMQQueue queue, long value); - - - } - - private static final QueueProperty[] DECLAREABLE_PROPERTIES = { - new QueueLongProperty("x-qpid-maximum-message-age") - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageAge(value); - } - }, - new QueueLongProperty("x-qpid-maximum-message-size") - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageSize(value); - } - }, - new QueueLongProperty("x-qpid-maximum-message-count") - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageCount(value); - } - }, - new QueueLongProperty("x-qpid-minimum-alert-repeat-gap") - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMinimumAlertRepeatGap(value); - } - }, - new QueueLongProperty("x-qpid-capacity") - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setCapacity(value); - } - }, - new QueueLongProperty("x-qpid-flow-resume-capacity") - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setFlowResumeCapacity(value); - } - } - - }; - - - /** @see #createAMQQueueImpl(String, boolean, String, boolean, boolean, VirtualHost, Map) */ - public static AMQQueue createAMQQueueImpl(AMQShortString name, - boolean durable, - AMQShortString owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, final FieldTable arguments) throws AMQException - { - return createAMQQueueImpl(name == null ? null : name.toString(), - durable, - owner == null ? null : owner.toString(), - autoDelete, - exclusive, - virtualHost, FieldTable.convertToMap(arguments)); - } - - - public static AMQQueue createAMQQueueImpl(String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException - { - // Access check - if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner)) - { - String description = "Permission denied: queue-name '" + queueName + "'"; - throw new AMQSecurityException(description); - } - - int priorities = 1; - String conflationKey = null; - if(arguments != null) - { - if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) - { - conflationKey = (String) arguments.get(QPID_LAST_VALUE_QUEUE_KEY); - if(conflationKey == null) - { - conflationKey = QPID_LVQ_KEY; - } - } - else if(arguments.containsKey(X_QPID_PRIORITIES.toString())) - { - Object prioritiesObj = arguments.get(X_QPID_PRIORITIES.toString()); - if(prioritiesObj instanceof Number) - { - priorities = ((Number)prioritiesObj).intValue(); - } - } - } - - AMQQueue q; - if(conflationKey != null) - { - q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); - } - else if(priorities > 1) - { - q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments); - } - else - { - q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments); - } - - //Register the new queue - virtualHost.getQueueRegistry().registerQueue(q); - q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName)); - - if(arguments != null) - { - for(QueueProperty p : DECLAREABLE_PROPERTIES) - { - if(arguments.containsKey(p.getArgumentName().toString())) - { - p.setPropertyValue(q, arguments.get(p.getArgumentName().toString())); - } - } - } - - return q; - - } - - - public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException - { - String queueName = config.getName(); - - boolean durable = config.getDurable(); - boolean autodelete = config.getAutoDelete(); - boolean exclusive = config.getExclusive(); - String owner = config.getOwner(); - Map<String,Object> arguments = null; - if(config.isLVQ() || config.getLVQKey() != null) - { - - arguments = new HashMap<String,Object>(); - arguments.put(QPID_LAST_VALUE_QUEUE, 1); - arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey()); - } - else - { - boolean priority = config.getPriority(); - int priorities = config.getPriorities(); - if(priority || priorities > 0) - { - arguments = new HashMap<String,Object>(); - if (priorities < 0) - { - priorities = 10; - } - arguments.put("x-qpid-priorities", priorities); - } - } - - AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments); - q.configure(config); - return q; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java deleted file mode 100644 index c8eb118b11..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ /dev/null @@ -1,647 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; -import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.txn.LocalTransaction; -import org.apache.qpid.transport.MessageProperties; - -import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.Notification; -import javax.management.ObjectName; -import javax.management.OperationsException; -import javax.management.monitor.MonitorNotification; -import javax.management.openmbean.ArrayType; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - -import java.text.SimpleDateFormat; -import java.util.*; - -/** - * AMQQueueMBean is the management bean for an {@link AMQQueue}. - * - * <p/><tablse id="crc"><caption>CRC Caption</caption> - * <tr><th> Responsibilities <th> Collaborations - * </table> - */ -@MBeanDescription("Management Interface for AMQQueue") -public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener -{ - /** Used for debugging purposes. */ - private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); - - private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z"); - - private AMQQueue _queue = null; - private String _queueName = null; - // OpenMBean data types for viewMessages method - - private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types. - private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. - private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. - - // OpenMBean data types for viewMessageContent method - private static CompositeType _msgContentType = null; - private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; - - private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; - private Notification _lastNotification = null; - - - - - @MBeanConstructor("Creates an MBean exposing an AMQQueue") - public AMQQueueMBean(AMQQueue queue) throws JMException - { - super(ManagedQueue.class, ManagedQueue.TYPE); - _queue = queue; - _queueName = queue.getName(); - } - - public ManagedObject getParentObject() - { - return _queue.getVirtualHost().getManagedObject(); - } - - static - { - try - { - init(); - } - catch (JMException ex) - { - // This is not expected to ever occur. - throw new RuntimeException("Got JMException in static initializer.", ex); - } - } - - /** - * initialises the openmbean data types - */ - private static void init() throws OpenDataException - { - _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id - _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType - _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding - _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content - _msgContentType = new CompositeType("Message Content", "AMQ Message Content", - VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]), - VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]), - _msgContentAttributeTypes); - - _msgAttributeTypes[0] = SimpleType.LONG; // For message id - _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes - _msgAttributeTypes[2] = SimpleType.LONG; // For size - _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered - _msgAttributeTypes[4] = SimpleType.LONG; // For queue position - - _messageDataType = new CompositeType("Message", "AMQ Message", - VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), - VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), _msgAttributeTypes); - _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, - VIEW_MSGS_TABULAR_UNIQUE_INDEX.toArray(new String[VIEW_MSGS_TABULAR_UNIQUE_INDEX.size()])); - } - - public String getObjectInstanceName() - { - return ObjectName.quote(_queueName); - } - - public String getName() - { - return _queueName; - } - - public boolean isDurable() - { - return _queue.isDurable(); - } - - public String getOwner() - { - return String.valueOf(_queue.getOwner()); - } - - public boolean isAutoDelete() - { - return _queue.isAutoDelete(); - } - - public Integer getMessageCount() - { - return _queue.getMessageCount(); - } - - public Long getMaximumMessageSize() - { - return _queue.getMaximumMessageSize(); - } - - public Long getMaximumMessageAge() - { - return _queue.getMaximumMessageAge(); - } - - public void setMaximumMessageAge(Long maximumMessageAge) - { - _queue.setMaximumMessageAge(maximumMessageAge); - } - - public void setMaximumMessageSize(Long value) - { - _queue.setMaximumMessageSize(value); - } - - public Integer getConsumerCount() - { - return _queue.getConsumerCount(); - } - - public Integer getActiveConsumerCount() - { - return _queue.getActiveConsumerCount(); - } - - public Long getReceivedMessageCount() - { - return _queue.getReceivedMessageCount(); - } - - public Long getMaximumMessageCount() - { - return _queue.getMaximumMessageCount(); - } - - public void setMaximumMessageCount(Long value) - { - _queue.setMaximumMessageCount(value); - } - - /** - * returns the maximum total size of messages(bytes) in the queue. - */ - public Long getMaximumQueueDepth() - { - return _queue.getMaximumQueueDepth(); - } - - public void setMaximumQueueDepth(Long value) - { - _queue.setMaximumQueueDepth(value); - } - - /** - * returns the total size of messages(bytes) in the queue. - */ - public Long getQueueDepth() throws JMException - { - return _queue.getQueueDepth(); - } - - public Long getCapacity() - { - return _queue.getCapacity(); - } - - public void setCapacity(Long capacity) throws IllegalArgumentException - { - if( _queue.getFlowResumeCapacity() > capacity ) - { - throw new IllegalArgumentException("Capacity must not be less than FlowResumeCapacity"); - } - - _queue.setCapacity(capacity); - } - - public Long getFlowResumeCapacity() - { - return _queue.getFlowResumeCapacity(); - } - - public void setFlowResumeCapacity(Long flowResumeCapacity) throws IllegalArgumentException - { - if( _queue.getCapacity() < flowResumeCapacity ) - { - throw new IllegalArgumentException("FlowResumeCapacity must not exceed Capacity"); - } - - _queue.setFlowResumeCapacity(flowResumeCapacity); - } - - public boolean isFlowOverfull() - { - return _queue.isOverfull(); - } - - public boolean isExclusive() - { - return _queue.isExclusive(); - } - - public void setExclusive(boolean exclusive) throws JMException - { - try - { - _queue.setExclusive(exclusive); - } - catch (AMQException e) - { - throw new JMException(e.toString()); - } - } - - /** - * Checks if there is any notification to be send to the listeners - */ - public void checkForNotification(ServerMessage msg) throws AMQException - { - - final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); - - if(!notificationChecks.isEmpty()) - { - final long currentTime = System.currentTimeMillis(); - final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); - - for (NotificationCheck check : notificationChecks) - { - if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) - { - if (check.notifyIfNecessary(msg, _queue, this)) - { - _lastNotificationTimes[check.ordinal()] = currentTime; - } - } - } - } - - } - - /** - * Sends the notification to the listeners - */ - public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg) - { - // important : add log to the log file - monitoring tools may be looking for this - _logger.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg); - notificationMsg = notification.name() + " " + notificationMsg; - - _lastNotification = - new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, - System.currentTimeMillis(), notificationMsg); - - _broadcaster.sendNotification(_lastNotification); - } - - public Notification getLastNotification() - { - return _lastNotification; - } - - /** - * @see AMQQueue#deleteMessageFromTop - */ - public void deleteMessageFromTop() throws JMException - { - _queue.deleteMessageFromTop(); - } - - /** - * Clears the queue of non-acquired messages - * - * @return the number of messages deleted - * @see AMQQueue#clearQueue - */ - public Long clearQueue() throws JMException - { - try - { - return _queue.clearQueue(); - } - catch (AMQException ex) - { - throw new MBeanException(ex, "Error clearing queue " + _queueName); - } - } - - /** - * returns message content as byte array and related attributes for the given message id. - */ - public CompositeData viewMessageContent(long msgId) throws JMException - { - QueueEntry entry = _queue.getMessageOnTheQueue(msgId); - - if (entry == null) - { - throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); - } - - ServerMessage serverMsg = entry.getMessage(); - final int bodySize = (int) serverMsg.getSize(); - - - List<Byte> msgContent = new ArrayList<Byte>(); - - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(bodySize); - int position = 0; - - while(position < bodySize) - { - position += serverMsg.getContent(buf, position); - buf.flip(); - for(int i = 0; i < buf.limit(); i++) - { - msgContent.add(buf.get(i)); - } - buf.clear(); - } - - AMQMessageHeader header = serverMsg.getMessageHeader(); - - String mimeType = null, encoding = null; - if (header != null) - { - mimeType = header.getMimeType(); - - encoding = header.getEncoding(); - } - - - Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; - - return new CompositeDataSupport(_msgContentType, - VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray( - new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues); - - } - - /** - * Returns the header contents of the messages stored in this queue in tabular form. - * Deprecated as of Qpid JMX API 1.3 - */ - @Deprecated - public TabularData viewMessages(int beginIndex, int endIndex) throws JMException - { - return viewMessages((long)beginIndex,(long)endIndex); - } - - - /** - * Returns the header contents of the messages stored in this queue in tabular form. - * @param startPosition The queue position of the first message to be viewed - * @param endPosition The queue position of the last message to be viewed - */ - public TabularData viewMessages(long startPosition, long endPosition) throws JMException - { - if ((startPosition > endPosition) || (startPosition < 1)) - { - throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition - + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); - } - - if ((endPosition - startPosition) > Integer.MAX_VALUE) - { - throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size"); - } - - List<QueueEntry> list = _queue.getMessagesRangeOnTheQueue(startPosition,endPosition); - TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); - - try - { - // Create the tabular list of message header contents - int size = list.size(); - - for (int i = 0; i < size ; i++) - { - long position = startPosition + i; - final QueueEntry queueEntry = list.get(i); - ServerMessage serverMsg = queueEntry.getMessage(); - - String[] headerAttributes = null; - Object[] itemValues = null; - - if(serverMsg instanceof AMQMessage) - { - AMQMessage msg = (AMQMessage) serverMsg; - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - headerAttributes = getMessageHeaderProperties(headerBody); - itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position}; - } - else if(serverMsg instanceof MessageTransferMessage) - { - // We have a 0-10 message - MessageTransferMessage msg = (MessageTransferMessage) serverMsg; - - // Create header attributes list - headerAttributes = getMessageTransferMessageHeaderProps(msg); - itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position}; - } - else - { - //unknown message - headerAttributes = new String[]{"N/A"}; - itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position}; - } - - CompositeData messageData = new CompositeDataSupport(_messageDataType, - VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues); - _messageList.put(messageData); - } - } - catch (AMQException e) - { - JMException jme = new JMException("Error creating message contents: " + e); - jme.initCause(e); - throw jme; - } - - return _messageList; - } - - private String[] getMessageHeaderProperties(ContentHeaderBody headerBody) - { - List<String> list = new ArrayList<String>(); - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties(); - list.add("reply-to = " + headerProperties.getReplyToAsString()); - list.add("propertyFlags = " + headerProperties.getPropertyFlags()); - list.add("ApplicationID = " + headerProperties.getAppIdAsString()); - list.add("ClusterID = " + headerProperties.getClusterIdAsString()); - list.add("UserId = " + headerProperties.getUserIdAsString()); - list.add("JMSMessageID = " + headerProperties.getMessageIdAsString()); - list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString()); - - int delMode = headerProperties.getDeliveryMode(); - list.add("JMSDeliveryMode = " + - ((delMode == BasicContentHeaderProperties.PERSISTENT) ? "Persistent" : "Non_Persistent")); - - list.add("JMSPriority = " + headerProperties.getPriority()); - list.add("JMSType = " + headerProperties.getType()); - - long longDate = headerProperties.getExpiration(); - String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSExpiration = " + strDate); - - longDate = headerProperties.getTimestamp(); - strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSTimestamp = " + strDate); - - return list.toArray(new String[list.size()]); - } - - private String[] getMessageTransferMessageHeaderProps(MessageTransferMessage msg) - { - List<String> list = new ArrayList<String>(); - - AMQMessageHeader header = msg.getMessageHeader(); - MessageProperties msgProps = msg.getHeader().get(MessageProperties.class); - - String appID = null; - String userID = null; - - if(msgProps != null) - { - appID = msgProps.getAppId() == null ? "null" : new String(msgProps.getAppId()); - userID = msgProps.getUserId() == null ? "null" : new String(msgProps.getUserId()); - } - - list.add("reply-to = " + header.getReplyTo()); - list.add("propertyFlags = "); //TODO - list.add("ApplicationID = " + appID); - list.add("ClusterID = "); //TODO - list.add("UserId = " + userID); - list.add("JMSMessageID = " + header.getMessageId()); - list.add("JMSCorrelationID = " + header.getCorrelationId()); - list.add("JMSDeliveryMode = " + (msg.isPersistent() ? "Persistent" : "Non_Persistent")); - list.add("JMSPriority = " + header.getPriority()); - list.add("JMSType = " + header.getType()); - - long longDate = header.getExpiration(); - String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSExpiration = " + strDate); - - longDate = header.getTimestamp(); - strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSTimestamp = " + strDate); - - return list.toArray(new String[list.size()]); - } - - /** - * @see ManagedQueue#moveMessages - * @param fromMessageId - * @param toMessageId - * @param toQueueName - * @throws JMException - */ - public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException - { - if ((fromMessageId > toMessageId) || (fromMessageId < 1)) - { - throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); - } - - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); - _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); - txn.commit(); - } - - /** - * @see ManagedQueue#deleteMessages - * @param fromMessageId - * @param toMessageId - * @throws JMException - */ - public void deleteMessages(long fromMessageId, long toMessageId) throws JMException - { - if ((fromMessageId > toMessageId) || (fromMessageId < 1)) - { - throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); - } - - _queue.removeMessagesFromQueue(fromMessageId, toMessageId); - } - - /** - * @see ManagedQueue#copyMessages - * @param fromMessageId - * @param toMessageId - * @param toQueueName - * @throws JMException - */ - public void copyMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException - { - if ((fromMessageId > toMessageId) || (fromMessageId < 1)) - { - throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); - } - - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); - - _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); - - txn.commit(); - - - } - - /** - * returns Notifications sent by this MBean. - */ - @Override - public MBeanNotificationInfo[] getNotificationInfo() - { - String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED }; - String name = MonitorNotification.class.getName(); - String description = "Either Message count or Queue depth or Message size has reached threshold high value"; - MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); - - return new MBeanNotificationInfo[] { info1 }; - } - -} // End of AMQQueueMBean class diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java deleted file mode 100644 index 05e0efd9a6..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; - -public interface BaseQueue extends TransactionLogResource -{ - public static interface PostEnqueueAction - { - public void onEnqueue(QueueEntry entry); - } - - void enqueue(ServerMessage message) throws AMQException; - void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException; - - boolean isDurable(); - - AMQShortString getNameShortString(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java deleted file mode 100644 index b5293f51be..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.Map; - -public class ConflationQueue extends SimpleAMQQueue -{ - protected ConflationQueue(String name, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, - Map<String, Object> args, - String conflationKey) - { - super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args); - } - - public String getConflationKey() - { - return ((ConflationQueueList) _entries).getConflationKey(); - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java deleted file mode 100644 index 2c1883e763..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; - -public class ConflationQueueList extends SimpleQueueEntryList -{ - - private final String _conflationKey; - private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap = - new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>(); - - public ConflationQueueList(AMQQueue queue, String conflationKey) - { - super(queue); - _conflationKey = conflationKey; - } - - public String getConflationKey() - { - return _conflationKey; - } - - @Override - protected ConflationQueueEntry createQueueEntry(ServerMessage message) - { - return new ConflationQueueEntry(this, message); - } - - - @Override - public QueueEntry add(final ServerMessage message) - { - ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message)); - AtomicReference<QueueEntry> latestValueReference = null; - - Object value = message.getMessageHeader().getHeader(_conflationKey); - if(value != null) - { - latestValueReference = _latestValuesMap.get(value); - if(latestValueReference == null) - { - _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry)); - latestValueReference = _latestValuesMap.get(value); - } - QueueEntry oldEntry; - - do - { - oldEntry = latestValueReference.get(); - } - while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry)); - - if(oldEntry.compareTo(entry) < 0) - { - // We replaced some other entry to become the newest value - if(oldEntry.acquire()) - { - discardEntry(oldEntry); - } - } - else if (oldEntry.compareTo(entry) > 0) - { - // A newer entry came along - discardEntry(entry); - - } - } - - entry.setLatestValueReference(latestValueReference); - return entry; - } - - private void discardEntry(final QueueEntry entry) - { - if(entry.acquire()) - { - ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); - txn.dequeue(entry.getQueue(),entry.getMessage(), - new ServerTransaction.Action() - { - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { - - } - }); - } - } - - private final class ConflationQueueEntry extends QueueEntryImpl - { - - - private AtomicReference<QueueEntry> _latestValueReference; - - public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message) - { - super(queueEntryList, message); - } - - - public void release() - { - super.release(); - - if(_latestValueReference != null) - { - if(_latestValueReference.get() != this) - { - discardEntry(this); - } - } - - } - - public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference) - { - _latestValueReference = latestValueReference; - } - } - - static class Factory implements QueueEntryListFactory - { - private final String _conflationKey; - - Factory(String conflationKey) - { - _conflationKey = conflationKey; - } - - public QueueEntryList createQueueEntryList(AMQQueue queue) - { - return new ConflationQueueList(queue, _conflationKey); - } - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java deleted file mode 100644 index d76487073d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public class DefaultQueueRegistry implements QueueRegistry -{ - private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); - - private final VirtualHost _virtualHost; - - public DefaultQueueRegistry(VirtualHost virtualHost) - { - _virtualHost = virtualHost; - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public void registerQueue(AMQQueue queue) - { - _queueMap.put(queue.getNameShortString(), queue); - } - - public void unregisterQueue(AMQShortString name) - { - _queueMap.remove(name); - } - - public AMQQueue getQueue(AMQShortString name) - { - return _queueMap.get(name); - } - - public Collection<AMQShortString> getQueueNames() - { - return _queueMap.keySet(); - } - - public Collection<AMQQueue> getQueues() - { - return _queueMap.values(); - } - - public AMQQueue getQueue(String queue) - { - return getQueue(new AMQShortString(queue)); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java deleted file mode 100644 index 6466e81dd2..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; - -/** - * Signals that the dequeue of a message from a queue failed. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Indicates the a message could not be dequeued from a queue. - * <tr><td> - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Happens as a consequence of a message store failure, or reference counting error. Both of which migh become - * runtime exceptions, as unrecoverable conditions? In which case this one might be dropped too. - */ -public class FailedDequeueException extends AMQException -{ - public FailedDequeueException(String queue) - { - super("Failed to dequeue message from " + queue); - } - - public FailedDequeueException(String queue, AMQException e) - { - super("Failed to dequeue message from " + queue, e); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java deleted file mode 100644 index eaa3992e98..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java +++ /dev/null @@ -1,34 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.AMQMessageHeader; - -public interface Filterable -{ - AMQMessageHeader getMessageHeader(); - - boolean isPersistent(); - - boolean isRedelivered(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java deleted file mode 100755 index 77da08d8c4..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.AMQMessageHeader; - -class InboundMessageAdapter implements InboundMessage -{ - - private QueueEntry _entry; - - InboundMessageAdapter() - { - } - - InboundMessageAdapter(QueueEntry entry) - { - _entry = entry; - } - - public void setEntry(QueueEntry entry) - { - _entry = entry; - } - - - public String getRoutingKey() - { - return _entry.getMessage().getRoutingKey(); - } - - public AMQMessageHeader getMessageHeader() - { - return _entry.getMessageHeader(); - } - - public boolean isPersistent() - { - return _entry.isPersistent(); - } - - public boolean isRedelivered() - { - return _entry.isRedelivered(); - } - - public long getSize() - { - return _entry.getSize(); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java deleted file mode 100644 index 3e3288404f..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; - -import java.util.ArrayList; -import java.util.List; -import java.nio.ByteBuffer; - -public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource -{ - - /** Used for debugging purposes. */ - private static final Logger _logger = Logger.getLogger(IncomingMessage.class); - - private static final boolean SYNCHED_CLOCKS = - ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks(); - - private final MessagePublishInfo _messagePublishInfo; - private ContentHeaderBody _contentHeaderBody; - - - /** - * Keeps a track of how many bytes we have received in body frames - */ - private long _bodyLengthReceived = 0; - - /** - * This is stored during routing, to know the queues to which this message should immediately be - * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done - * by the message handle. - */ - private ArrayList<? extends BaseQueue> _destinationQueues; - - private long _expiration; - - private Exchange _exchange; - - - private int _receivedChunkCount = 0; - private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>(); - - // we keep both the original meta data object and the store reference to it just in case the - // store would otherwise flow it to disk - - private MessageMetaData _messageMetaData; - - private StoredMessage<MessageMetaData> _storedMessageHandle; - - - public IncomingMessage( - final MessagePublishInfo info - ) - { - _messagePublishInfo = info; - } - - public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException - { - _contentHeaderBody = contentHeaderBody; - } - - public void setExpiration() - { - long expiration = - ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration(); - long timestamp = - ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp(); - - if (SYNCHED_CLOCKS) - { - _expiration = expiration; - } - else - { - // Update TTL to be in broker time. - if (expiration != 0L) - { - if (timestamp != 0L) - { - // todo perhaps use arrival time - long diff = (System.currentTimeMillis() - timestamp); - - if ((diff > 1000L) || (diff < 1000L)) - { - _expiration = expiration + diff; - } - } - } - } - - } - - public MessageMetaData headersReceived() - { - _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0); - return _messageMetaData; - } - - - public ArrayList<? extends BaseQueue> getDestinationQueues() - { - return _destinationQueues; - } - - public int addContentBodyFrame(final ContentChunk contentChunk) - throws AMQException - { - _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf()); - _bodyLengthReceived += contentChunk.getSize(); - _contentChunks.add(contentChunk); - - - - return _receivedChunkCount++; - } - - public boolean allContentReceived() - { - return (_bodyLengthReceived == getContentHeader().bodySize); - } - - public AMQShortString getExchange() - { - return _messagePublishInfo.getExchange(); - } - - public String getRoutingKey() - { - return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); - } - - public String getBinding() - { - return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); - } - - - public boolean isMandatory() - { - return _messagePublishInfo.isMandatory(); - } - - - public boolean isImmediate() - { - return _messagePublishInfo.isImmediate(); - } - - public ContentHeaderBody getContentHeader() - { - return _contentHeaderBody; - } - - - public AMQMessageHeader getMessageHeader() - { - return _messageMetaData.getMessageHeader(); - } - - public boolean isPersistent() - { - return getContentHeader().getProperties() instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() == - BasicContentHeaderProperties.PERSISTENT; - } - - public boolean isRedelivered() - { - return false; - } - - - public long getSize() - { - return getContentHeader().bodySize; - } - - public Long getMessageNumber() - { - return _storedMessageHandle.getMessageNumber(); - } - - public void setExchange(final Exchange e) - { - _exchange = e; - } - - public void route() - { - enqueue(_exchange.route(this)); - - } - - public void enqueue(final ArrayList<? extends BaseQueue> queues) - { - _destinationQueues = queues; - } - - public MessagePublishInfo getMessagePublishInfo() - { - return _messagePublishInfo; - } - - public long getExpiration() - { - return _expiration; - } - - public int getReceivedChunkCount() - { - return _receivedChunkCount; - } - - - public int getBodyCount() throws AMQException - { - return _contentChunks.size(); - } - - public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException - { - return _contentChunks.get(index); - } - - - public int getContent(ByteBuffer buf, int offset) - { - int pos = 0; - int written = 0; - for(ContentChunk cb : _contentChunks) - { - ByteBuffer data = cb.getData().buf(); - if(offset+written >= pos && offset < pos + data.limit()) - { - ByteBuffer src = data.duplicate(); - src.position(offset+written - pos); - src = src.slice(); - - if(buf.remaining() < src.limit()) - { - src.limit(buf.remaining()); - } - int count = src.limit(); - buf.put(src); - written += count; - if(buf.remaining() == 0) - { - break; - } - } - pos+=data.limit(); - } - return written; - - } - - public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle) - { - _storedMessageHandle = storedMessageHandle; - } - - public StoredMessage<MessageMetaData> getStoredMessage() - { - return _storedMessageHandle; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java deleted file mode 100644 index 090096d3c3..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; - -/** - * MessageCleanupException represents the failure to perform reference counting on messages correctly. This should not - * happen, but there may be programming errors giving race conditions that cause the reference counting to go wrong. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Signals that the reference count of a message has gone below zero. - * <tr><td> Indicates that a message store has lost a message which is still referenced. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo The race conditions leading to this error should be cleaned up, and a runtime exception used instead. If the - * message store loses messages, then something is seriously wrong and it would be sensible to terminate the - * broker. This may be disguising out of memory errors. - */ -public class MessageCleanupException extends AMQException -{ - public MessageCleanupException(long messageId, AMQException e) - { - super("Failed to cleanup message with id " + messageId, e); - } - - public MessageCleanupException(String message) - { - super(message); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java deleted file mode 100644 index d1fb0f3fe6..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ /dev/null @@ -1,132 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.ServerMessage;
-
-public enum NotificationCheck
-{
-
- MESSAGE_COUNT_ALERT
- {
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
- int msgCount;
- final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
- {
- listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
- return true;
- }
- return false;
- }
- },
- MESSAGE_SIZE_ALERT(true)
- {
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
- final long maximumMessageSize = queue.getMaximumMessageSize();
- if(maximumMessageSize != 0)
- {
- // Check for threshold message size
- long messageSize;
- messageSize = (msg == null) ? 0 : msg.getSize();
-
-
- if (messageSize >= maximumMessageSize)
- {
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]");
- return true;
- }
- }
- return false;
- }
-
- },
- QUEUE_DEPTH_ALERT
- {
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
- // Check for threshold queue depth in bytes
- final long maximumQueueDepth = queue.getMaximumQueueDepth();
-
- if(maximumQueueDepth != 0)
- {
- final long queueDepth = queue.getQueueDepth();
-
- if (queueDepth >= maximumQueueDepth)
- {
- listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
- return true;
- }
- }
- return false;
- }
-
- },
- MESSAGE_AGE_ALERT
- {
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
-
- final long maxMessageAge = queue.getMaximumMessageAge();
- if(maxMessageAge != 0)
- {
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - maxMessageAge;
- final long firstArrivalTime = queue.getOldestMessageArrivalTime();
-
- if(firstArrivalTime < thresholdTime)
- {
- long oldestAge = currentTime - firstArrivalTime;
- listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
-
- return true;
- }
- }
- return false;
-
- }
-
- }
- ;
-
- private final boolean _messageSpecific;
-
- NotificationCheck()
- {
- this(false);
- }
-
- NotificationCheck(boolean messageSpecific)
- {
- _messageSpecific = messageSpecific;
- }
-
- public boolean isMessageSpecific()
- {
- return _messageSpecific;
- }
-
- abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java deleted file mode 100644 index 0c6b84d2b6..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ /dev/null @@ -1,162 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.CommonContentHeaderProperties; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.ServerMessage; - -public class PriorityQueueList implements QueueEntryList -{ - private final AMQQueue _queue; - private final QueueEntryList[] _priorityLists; - private final int _priorities; - private final int _priorityOffset; - - public PriorityQueueList(AMQQueue queue, int priorities) - { - _queue = queue; - _priorityLists = new QueueEntryList[priorities]; - _priorities = priorities; - _priorityOffset = 5-((priorities + 1)/2); - for(int i = 0; i < priorities; i++) - { - _priorityLists[i] = new SimpleQueueEntryList(queue); - } - } - - public int getPriorities() - { - return _priorities; - } - - public AMQQueue getQueue() - { - return _queue; - } - - public QueueEntry add(ServerMessage message) - { - int index = message.getMessageHeader().getPriority() - _priorityOffset; - if(index >= _priorities) - { - index = _priorities-1; - } - else if(index < 0) - { - index = 0; - } - return _priorityLists[index].add(message); - - } - - public QueueEntry next(QueueEntry node) - { - QueueEntryImpl nodeImpl = (QueueEntryImpl)node; - QueueEntry next = nodeImpl.getNext(); - - if(next == null) - { - QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList(); - int index; - for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--); - - while(next == null && index != 0) - { - index--; - next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext(); - } - - } - return next; - } - - private final class PriorityQueueEntryListIterator implements QueueEntryIterator - { - private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ]; - private QueueEntry _lastNode; - - PriorityQueueEntryListIterator() - { - for(int i = 0; i < _priorityLists.length; i++) - { - _iterators[i] = _priorityLists[i].iterator(); - } - _lastNode = _iterators[_iterators.length - 1].getNode(); - } - - - public boolean atTail() - { - for(int i = 0; i < _iterators.length; i++) - { - if(!_iterators[i].atTail()) - { - return false; - } - } - return true; - } - - public QueueEntry getNode() - { - return _lastNode; - } - - public boolean advance() - { - for(int i = _iterators.length-1; i >= 0; i--) - { - if(_iterators[i].advance()) - { - _lastNode = _iterators[i].getNode(); - return true; - } - } - return false; - } - } - - public QueueEntryIterator iterator() - { - return new PriorityQueueEntryListIterator(); - } - - public QueueEntry getHead() - { - return _priorityLists[_priorities-1].getHead(); - } - - static class Factory implements QueueEntryListFactory - { - private final int _priorities; - - Factory(int priorities) - { - _priorities = priorities; - } - - public QueueEntryList createQueueEntryList(AMQQueue queue) - { - return new PriorityQueueList(queue, _priorities); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java deleted file mode 100755 index 825a85a89c..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -final class QueueContext implements AMQQueue.Context -{ - volatile QueueEntry _lastSeenEntry; - volatile QueueEntry _releasedEntry; - - static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry> - _lastSeenUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueContext.class, QueueEntry.class, "_lastSeenEntry"); - static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry> - _releasedUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueContext.class, QueueEntry.class, "_releasedEntry"); - - public QueueContext(QueueEntry head) - { - _lastSeenEntry = head; - } - - public QueueEntry getLastSeenEntry() - { - return _lastSeenEntry; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java deleted file mode 100644 index 79ede2694e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ /dev/null @@ -1,210 +0,0 @@ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.message.ServerMessage; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public interface QueueEntry extends Comparable<QueueEntry>, Filterable -{ - - - - public static enum State - { - AVAILABLE, - ACQUIRED, - EXPIRED, - DEQUEUED, - DELETED; - - - } - - public static interface StateChangeListener - { - public void stateChanged(QueueEntry entry, State oldSate, State newState); - } - - public abstract class EntryState - { - private EntryState() - { - } - - public abstract State getState(); - } - - - public final class AvailableState extends EntryState - { - - public State getState() - { - return State.AVAILABLE; - } - } - - - public final class DequeuedState extends EntryState - { - - public State getState() - { - return State.DEQUEUED; - } - } - - - public final class DeletedState extends EntryState - { - - public State getState() - { - return State.DELETED; - } - } - - public final class ExpiredState extends EntryState - { - - public State getState() - { - return State.EXPIRED; - } - } - - - public final class NonSubscriptionAcquiredState extends EntryState - { - public State getState() - { - return State.ACQUIRED; - } - } - - public final class SubscriptionAcquiredState extends EntryState - { - private final Subscription _subscription; - - public SubscriptionAcquiredState(Subscription subscription) - { - _subscription = subscription; - } - - - public State getState() - { - return State.ACQUIRED; - } - - public Subscription getSubscription() - { - return _subscription; - } - } - - public final class SubscriptionAssignedState extends EntryState - { - private final Subscription _subscription; - - public SubscriptionAssignedState(Subscription subscription) - { - _subscription = subscription; - } - - - public State getState() - { - return State.AVAILABLE; - } - - public Subscription getSubscription() - { - return _subscription; - } - } - - - final static EntryState AVAILABLE_STATE = new AvailableState(); - final static EntryState DELETED_STATE = new DeletedState(); - final static EntryState DEQUEUED_STATE = new DequeuedState(); - final static EntryState EXPIRED_STATE = new ExpiredState(); - final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); - - - - - AMQQueue getQueue(); - - ServerMessage getMessage(); - - long getSize(); - - boolean getDeliveredToConsumer(); - - boolean expired() throws AMQException; - - boolean isAvailable(); - - boolean isAcquired(); - - boolean acquire(); - boolean acquire(Subscription sub); - - boolean delete(); - boolean isDeleted(); - - boolean acquiredBySubscription(); - boolean isAcquiredBy(Subscription subscription); - - void release(); - boolean releaseButRetain(); - - - boolean immediateAndNotDelivered(); - - void setRedelivered(); - - boolean isRedelivered(); - - Subscription getDeliveredSubscription(); - - void reject(); - - void reject(Subscription subscription); - - boolean isRejectedBy(Subscription subscription); - - void dequeue(); - - void dispose(); - - void discard(); - - void routeToAlternate(); - - boolean isQueueDeleted(); - - void addStateChangeListener(StateChangeListener listener); - boolean removeStateChangeListener(StateChangeListener listener); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java deleted file mode 100644 index 809ba3277e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ /dev/null @@ -1,550 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - - -public class QueueEntryImpl implements QueueEntry -{ - - /** - * Used for debugging purposes. - */ - private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); - - private final SimpleQueueEntryList _queueEntryList; - - private MessageReference _message; - - private Set<Subscription> _rejectedBy = null; - - private volatile EntryState _state = AVAILABLE_STATE; - - private static final - AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState> - _stateUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, EntryState.class, "_state"); - - - private volatile Set<StateChangeListener> _stateChangeListeners; - - private static final - AtomicReferenceFieldUpdater<QueueEntryImpl, Set> - _listenersUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, Set.class, "_stateChangeListeners"); - - - private static final - AtomicLongFieldUpdater<QueueEntryImpl> - _entryIdUpdater = - AtomicLongFieldUpdater.newUpdater - (QueueEntryImpl.class, "_entryId"); - - - private volatile long _entryId; - - volatile QueueEntryImpl _next; - - private static final int DELIVERED_TO_CONSUMER = 1; - private static final int REDELIVERED = 2; - - private volatile int _deliveryState; - - - QueueEntryImpl(SimpleQueueEntryList queueEntryList) - { - this(queueEntryList,null,Long.MIN_VALUE); - _state = DELETED_STATE; - } - - - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) - { - _queueEntryList = queueEntryList; - - _message = message == null ? null : message.newReference(); - - _entryIdUpdater.set(this, entryId); - } - - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) - { - _queueEntryList = queueEntryList; - _message = message == null ? null : message.newReference(); - } - - protected void setEntryId(long entryId) - { - _entryIdUpdater.set(this, entryId); - } - - protected long getEntryId() - { - return _entryId; - } - - public AMQQueue getQueue() - { - return _queueEntryList.getQueue(); - } - - public ServerMessage getMessage() - { - return _message == null ? null : _message.getMessage(); - } - - public long getSize() - { - return getMessage() == null ? 0 : getMessage().getSize(); - } - - public boolean getDeliveredToConsumer() - { - return (_deliveryState & DELIVERED_TO_CONSUMER) != 0; - } - - public boolean expired() throws AMQException - { - ServerMessage message = getMessage(); - if(message != null) - { - long expiration = message.getExpiration(); - if (expiration != 0L) - { - long now = System.currentTimeMillis(); - - return (now > expiration); - } - } - return false; - - } - - public boolean isAvailable() - { - return _state == AVAILABLE_STATE; - } - - public boolean isAcquired() - { - return _state.getState() == State.ACQUIRED; - } - - public boolean acquire() - { - return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE); - } - - private boolean acquire(final EntryState state) - { - boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state); - - // deal with the case where the node has been assigned to a given subscription already - // including the case that the node is assigned to a closed subscription - if(!acquired) - { - if(state != NON_SUBSCRIPTION_ACQUIRED_STATE) - { - EntryState currentState = _state; - if(currentState.getState() == State.AVAILABLE - && ((currentState == AVAILABLE_STATE) - || (((SubscriptionAcquiredState)state).getSubscription() == - ((SubscriptionAssignedState)currentState).getSubscription()) - || ((SubscriptionAssignedState)currentState).getSubscription().isClosed() )) - { - acquired = _stateUpdater.compareAndSet(this,currentState, state); - } - } - } - if(acquired && _stateChangeListeners != null) - { - notifyStateChange(State.AVAILABLE, State.ACQUIRED); - } - - return acquired; - } - - public boolean acquire(Subscription sub) - { - final boolean acquired = acquire(sub.getOwningState()); - if(acquired) - { - _deliveryState |= DELIVERED_TO_CONSUMER; - } - return acquired; - } - - public boolean acquiredBySubscription() - { - - return (_state instanceof SubscriptionAcquiredState); - } - - public boolean isAcquiredBy(Subscription subscription) - { - EntryState state = _state; - return state instanceof SubscriptionAcquiredState - && ((SubscriptionAcquiredState)state).getSubscription() == subscription; - } - - public void release() - { - EntryState state = _state; - - if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE)) - { - if(state instanceof SubscriptionAcquiredState) - { - getQueue().decrementUnackedMsgCount(); - } - - if(!getQueue().isDeleted()) - { - getQueue().requeue(this); - if(_stateChangeListeners != null) - { - notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); - } - - } - else if(acquire()) - { - routeToAlternate(); - } - } - } - - public boolean releaseButRetain() - { - EntryState state = _state; - - boolean stateUpdated = false; - - if(state instanceof SubscriptionAcquiredState) - { - Subscription sub = ((SubscriptionAcquiredState) state).getSubscription(); - if(_stateUpdater.compareAndSet(this, state, sub.getAssignedState())) - { - System.err.println("Message released (and retained)" + getMessage().getMessageNumber()); - getQueue().requeue(this); - if(_stateChangeListeners != null) - { - notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); - } - stateUpdated = true; - } - } - - return stateUpdated; - - } - - public boolean immediateAndNotDelivered() - { - return !getDeliveredToConsumer() && isImmediate(); - } - - private boolean isImmediate() - { - final ServerMessage message = getMessage(); - return message != null && message.isImmediate(); - } - - public void setRedelivered() - { - _deliveryState |= REDELIVERED; - } - - public AMQMessageHeader getMessageHeader() - { - final ServerMessage message = getMessage(); - return message == null ? null : message.getMessageHeader(); - } - - public boolean isPersistent() - { - final ServerMessage message = getMessage(); - return message != null && message.isPersistent(); - } - - public boolean isRedelivered() - { - return (_deliveryState & REDELIVERED) != 0; - } - - public Subscription getDeliveredSubscription() - { - EntryState state = _state; - if (state instanceof SubscriptionAcquiredState) - { - return ((SubscriptionAcquiredState) state).getSubscription(); - } - else - { - return null; - } - - } - - public void reject() - { - reject(getDeliveredSubscription()); - } - - public void reject(Subscription subscription) - { - if (subscription != null) - { - if (_rejectedBy == null) - { - _rejectedBy = new HashSet<Subscription>(); - } - - _rejectedBy.add(subscription); - } - else - { - _log.warn("Requesting rejection by null subscriber:" + this); - } - } - - public boolean isRejectedBy(Subscription subscription) - { - - if (_rejectedBy != null) // We have subscriptions that rejected this message - { - return _rejectedBy.contains(subscription); - } - else // This messasge hasn't been rejected yet. - { - return false; - } - } - - public void dequeue() - { - EntryState state = _state; - - if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) - { - Subscription s = null; - if (state instanceof SubscriptionAcquiredState) - { - getQueue().decrementUnackedMsgCount(); - s = ((SubscriptionAcquiredState) state).getSubscription(); - s.onDequeue(this); - } - - getQueue().dequeue(this,s); - if(_stateChangeListeners != null) - { - notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); - } - - } - - } - - private void notifyStateChange(final State oldState, final State newState) - { - for(StateChangeListener l : _stateChangeListeners) - { - l.stateChanged(this, oldState, newState); - } - } - - public void dispose() - { - if(delete()) - { - _message.release(); - } - } - - public void discard() - { - //if the queue is null then the message is waiting to be acked, but has been removed. - if (getQueue() != null) - { - dequeue(); - } - - dispose(); - } - - public void routeToAlternate() - { - final AMQQueue currentQueue = getQueue(); - Exchange alternateExchange = currentQueue.getAlternateExchange(); - - if(alternateExchange != null) - { - final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); - final ServerMessage message = getMessage(); - if(rerouteQueues != null && rerouteQueues.size() != 0) - { - ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); - - txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() { - public void postCommit() - { - try - { - for(BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - - } - }); - txn.dequeue(currentQueue,message, - new ServerTransaction.Action() - { - public void postCommit() - { - discard(); - } - - public void onRollback() - { - - } - }); - } - } - } - - public boolean isQueueDeleted() - { - return getQueue().isDeleted(); - } - - public void addStateChangeListener(StateChangeListener listener) - { - Set<StateChangeListener> listeners = _stateChangeListeners; - if(listeners == null) - { - _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>()); - listeners = _stateChangeListeners; - } - - listeners.add(listener); - } - - public boolean removeStateChangeListener(StateChangeListener listener) - { - Set<StateChangeListener> listeners = _stateChangeListeners; - if(listeners != null) - { - return listeners.remove(listener); - } - - return false; - } - - - public int compareTo(final QueueEntry o) - { - QueueEntryImpl other = (QueueEntryImpl)o; - return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; - } - - public QueueEntryImpl getNext() - { - - QueueEntryImpl next = nextNode(); - while(next != null && next.isDeleted()) - { - - final QueueEntryImpl newNext = next.nextNode(); - if(newNext != null) - { - SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); - next = nextNode(); - } - else - { - next = null; - } - - } - return next; - } - - QueueEntryImpl nextNode() - { - return _next; - } - - public boolean isDeleted() - { - return _state == DELETED_STATE; - } - - public boolean delete() - { - EntryState state = _state; - - if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) - { - _queueEntryList.advanceHead(); - return true; - } - else - { - return false; - } - } - - public QueueEntryList getQueueEntryList() - { - return _queueEntryList; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java deleted file mode 100644 index c5c115a2d1..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java +++ /dev/null @@ -1,30 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -public interface QueueEntryIterator -{ - boolean atTail(); - - QueueEntry getNode(); - - boolean advance(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java deleted file mode 100644 index b4042ce02c..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ /dev/null @@ -1,36 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.ServerMessage; - -public interface QueueEntryList -{ - AMQQueue getQueue(); - - QueueEntry add(ServerMessage message); - - QueueEntry next(QueueEntry node); - - QueueEntryIterator iterator(); - - QueueEntry getHead(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java deleted file mode 100644 index 4dbce45f67..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java +++ /dev/null @@ -1,26 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -interface QueueEntryListFactory -{ - public QueueEntryList createQueueEntryList(AMQQueue queue); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java deleted file mode 100644 index 959ca03c80..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java +++ /dev/null @@ -1,27 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-
-public interface QueueNotificationListener
-{
- void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java deleted file mode 100644 index a537e0c83f..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.Collection; - -public interface QueueRegistry -{ - VirtualHost getVirtualHost(); - - void registerQueue(AMQQueue queue); - - void unregisterQueue(AMQShortString name); - - AMQQueue getQueue(AMQShortString name); - - Collection<AMQShortString> getQueueNames(); - - Collection<AMQQueue> getQueues(); - - AMQQueue getQueue(String queue); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java deleted file mode 100644 index 7e1d57e205..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.pool.ReadWriteRunnable; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.queue.QueueRunner; -import org.apache.qpid.server.queue.SimpleAMQQueue; - -/** - * QueueRunners are Runnables used to process a queue when requiring - * asynchronous message delivery to subscriptions, which is necessary - * when straight-through delivery of a message to a subscription isn't - * possible during the enqueue operation. - */ -public class QueueRunner implements ReadWriteRunnable -{ - private static final Logger _logger = Logger.getLogger(QueueRunner.class); - - private final String _name; - private final SimpleAMQQueue _queue; - - public QueueRunner(SimpleAMQQueue queue, long count) - { - _queue = queue; - _name = "QueueRunner-" + count + "-" + queue.getLogActor(); - } - - public void run() - { - String originalName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName(_name); - CurrentActor.set(_queue.getLogActor()); - - _queue.processQueue(this); - } - catch (AMQException e) - { - _logger.error("Exception during asynchronous delivery by " + _name, e); - } - finally - { - CurrentActor.remove(); - Thread.currentThread().setName(originalName); - } - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - - public String toString() - { - return _name; - } -}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java deleted file mode 100644 index b02d03a1ad..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ /dev/null @@ -1,2233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQSecurityException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.pool.ReadWriteRunnable; -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; -import org.apache.qpid.server.configuration.QueueConfigType; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.SessionConfig; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.QueueActor; -import org.apache.qpid.server.logging.messages.QueueMessages; -import org.apache.qpid.server.logging.subjects.QueueLogSubject; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.PrincipalHolder; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.LocalTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import javax.management.JMException; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener -{ - private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); - - - private final VirtualHost _virtualHost; - - private final AMQShortString _name; - private final String _resourceName; - - /** null means shared */ - private final AMQShortString _owner; - - private PrincipalHolder _prinicpalHolder; - - private boolean _exclusive = false; - private AMQSessionModel _exclusiveOwner; - - - private final boolean _durable; - - /** If true, this queue is deleted when the last subscriber is removed */ - private final boolean _autoDelete; - - private Exchange _alternateExchange; - - /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ - - - - protected final QueueEntryList _entries; - - protected final SubscriptionList _subscriptionList = new SubscriptionList(this); - - private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); - - private volatile Subscription _exclusiveSubscriber; - - - - private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); - - private final AtomicLong _atomicQueueSize = new AtomicLong(0L); - - private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); - - private final AtomicLong _totalMessagesReceived = new AtomicLong(); - - private final AtomicLong _dequeueCount = new AtomicLong(); - private final AtomicLong _dequeueSize = new AtomicLong(); - private final AtomicLong _enqueueSize = new AtomicLong(); - private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong(); - private final AtomicLong _persistentMessageDequeueSize = new AtomicLong(); - private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong(); - private final AtomicLong _persistentMessageDequeueCount = new AtomicLong(); - private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0); - private final AtomicLong _msgTxnEnqueues = new AtomicLong(0); - private final AtomicLong _byteTxnEnqueues = new AtomicLong(0); - private final AtomicLong _msgTxnDequeues = new AtomicLong(0); - private final AtomicLong _byteTxnDequeues = new AtomicLong(0); - private final AtomicLong _unackedMsgCount = new AtomicLong(0); - private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0); - - private final AtomicInteger _bindingCountHigh = new AtomicInteger(); - - /** max allowed size(KB) of a single message */ - public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); - - /** max allowed number of messages on a queue. */ - public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount(); - - /** max queue depth for the queue */ - public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth(); - - /** maximum message age before alerts occur */ - public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge(); - - /** the minimum interval between sending out consecutive alerts of the same type */ - public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); - - private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); - - private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); - - private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); - - - static final int MAX_ASYNC_DELIVERIES = 10; - - - private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); - private AtomicReference<Runnable> _asynchronousRunner = new AtomicReference<Runnable>(null); - private final Executor _asyncDelivery; - private AtomicInteger _deliveredMessages = new AtomicInteger(); - private AtomicBoolean _stopped = new AtomicBoolean(false); - - private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); - - private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); - - - private LogSubject _logSubject; - private LogActor _logActor; - - private AMQQueueMBean _managedObject; - private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER"; - private boolean _nolocal; - - private final AtomicBoolean _overfull = new AtomicBoolean(false); - private boolean _deleteOnNoConsumers; - private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>(); - private UUID _id; - private final Map<String, Object> _arguments; - - //TODO : persist creation time - private long _createTime = System.currentTimeMillis(); - private ConfigurationPlugin _queueConfiguration; - - - - protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) - { - this(name, durable, owner, autoDelete, exclusive, virtualHost,new SimpleQueueEntryList.Factory(), arguments); - } - - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) - { - this(queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments); - } - - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) - { - this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments); - } - - protected SimpleAMQQueue(AMQShortString name, - boolean durable, - AMQShortString owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, - QueueEntryListFactory entryListFactory, - Map<String,Object> arguments) - { - - if (name == null) - { - throw new IllegalArgumentException("Queue name must not be null"); - } - - if (virtualHost == null) - { - throw new IllegalArgumentException("Virtual Host must not be null"); - } - - _name = name; - _resourceName = String.valueOf(name); - _durable = durable; - _owner = owner; - _autoDelete = autoDelete; - _exclusive = exclusive; - _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList(this); - _arguments = arguments; - - _id = virtualHost.getConfigStore().createId(); - - _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); - - _logSubject = new QueueLogSubject(this); - _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); - - // Log the correct creation message - - // Extract the number of priorities for this Queue. - // Leave it as 0 if we are a SimpleQueueEntryList - int priorities = 0; - if (entryListFactory instanceof PriorityQueueList.Factory) - { - priorities = ((PriorityQueueList)_entries).getPriorities(); - } - - // Log the creation of this Queue. - // The priorities display is toggled on if we set priorities > 0 - CurrentActor.get().message(_logSubject, - QueueMessages.CREATED(String.valueOf(_owner), - priorities, - _owner != null, - autoDelete, - durable, !durable, - priorities > 0)); - - getConfigStore().addConfiguredObject(this); - - try - { - _managedObject = new AMQQueueMBean(this); - _managedObject.register(); - } - catch (JMException e) - { - _logger.error("AMQQueue MBean creation has failed ", e); - } - - resetNotifications(); - - } - - public void resetNotifications() - { - // This ensure that the notification checks for the configured alerts are created. - setMaximumMessageAge(_maximumMessageAge); - setMaximumMessageCount(_maximumMessageCount); - setMaximumMessageSize(_maximumMessageSize); - setMaximumQueueDepth(_maximumQueueDepth); - } - - // ------ Getters and Setters - - public void execute(ReadWriteRunnable runnable) - { - _asyncDelivery.execute(runnable); - } - - public AMQShortString getNameShortString() - { - return _name; - } - - public void setNoLocal(boolean nolocal) - { - _nolocal = nolocal; - } - - public UUID getId() - { - return _id; - } - - public QueueConfigType getConfigType() - { - return QueueConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getVirtualHost(); - } - - public boolean isDurable() - { - return _durable; - } - - public boolean isExclusive() - { - return _exclusive; - } - - public void setExclusive(boolean exclusive) throws AMQException - { - _exclusive = exclusive; - - if(isDurable()) - { - getVirtualHost().getDurableConfigurationStore().updateQueue(this); - } - } - - public Exchange getAlternateExchange() - { - return _alternateExchange; - } - - public void setAlternateExchange(Exchange exchange) - { - if(_alternateExchange != null) - { - _alternateExchange.removeReference(this); - } - if(exchange != null) - { - exchange.addReference(this); - } - _alternateExchange = exchange; - } - - public Map<String, Object> getArguments() - { - return _arguments; - } - - public boolean isAutoDelete() - { - return _autoDelete; - } - - public AMQShortString getOwner() - { - return _owner; - } - - public PrincipalHolder getPrincipalHolder() - { - return _prinicpalHolder; - } - - public void setPrincipalHolder(PrincipalHolder prinicpalHolder) - { - _prinicpalHolder = prinicpalHolder; - } - - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public String getName() - { - return getNameShortString().toString(); - } - - // ------ Manage Subscriptions - - public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) - throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive - { - // Access control - if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) - { - throw new AMQSecurityException("Permission denied"); - } - - - if (hasExclusiveSubscriber()) - { - throw new ExistingExclusiveSubscription(); - } - - if (exclusive && !subscription.isTransient()) - { - if (getConsumerCount() != 0) - { - throw new ExistingSubscriptionPreventsExclusive(); - } - else - { - _exclusiveSubscriber = subscription; - } - } - - _activeSubscriberCount.incrementAndGet(); - subscription.setStateListener(this); - subscription.setQueueContext(new QueueContext(_entries.getHead())); - - if (!isDeleted()) - { - subscription.setQueue(this, exclusive); - if(_nolocal) - { - subscription.setNoLocal(_nolocal); - } - _subscriptionList.add(subscription); - - //Increment consumerCountHigh if necessary. (un)registerSubscription are both - //synchronized methods so we don't need additional synchronization here - if(_counsumerCountHigh.get() < getConsumerCount()) - { - _counsumerCountHigh.incrementAndGet(); - } - - if (isDeleted()) - { - subscription.queueDeleted(this); - } - } - else - { - // TODO - } - - deliverAsync(subscription); - - } - - public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException - { - if (subscription == null) - { - throw new NullPointerException("subscription argument is null"); - } - - boolean removed = _subscriptionList.remove(subscription); - - if (removed) - { - subscription.close(); - // No longer can the queue have an exclusive consumer - setExclusiveSubscriber(null); - subscription.setQueueContext(null); - - // auto-delete queues must be deleted if there are no remaining subscribers - - if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Auto-deleteing queue:" + this); - } - - delete(); - - // we need to manually fire the event to the removed subscription (which was the last one left for this - // queue. This is because the delete method uses the subscription set which has just been cleared - subscription.queueDeleted(this); - } - } - - } - - public boolean getDeleteOnNoConsumers() - { - return _deleteOnNoConsumers; - } - - public void setDeleteOnNoConsumers(boolean b) - { - _deleteOnNoConsumers = b; - } - - public void addBinding(final Binding binding) - { - _bindings.add(binding); - int bindingCount = _bindings.size(); - int bindingCountHigh; - while(bindingCount > (bindingCountHigh = _bindingCountHigh.get())) - { - if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount)) - { - break; - } - } - - reconfigure(); - } - - private void reconfigure() - { - //Reconfigure the queue for to reflect this new binding. - ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration); - } - - if (config != null) - { - // Reconfigure with new config. - configure(config); - } - } - - public int getBindingCountHigh() - { - return _bindingCountHigh.get(); - } - - public void removeBinding(final Binding binding) - { - _bindings.remove(binding); - - reconfigure(); - } - - public List<Binding> getBindings() - { - return Collections.unmodifiableList(_bindings); - } - - public int getBindingCount() - { - return getBindings().size(); - } - - public LogSubject getLogSubject() - { - return _logSubject; - } - - // ------ Enqueue / Dequeue - public void enqueue(ServerMessage message) throws AMQException - { - enqueue(message, null); - } - - public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException - { - incrementTxnEnqueueStats(message); - incrementQueueCount(); - incrementQueueSize(message); - _totalMessagesReceived.incrementAndGet(); - - - QueueEntry entry; - Subscription exclusiveSub = _exclusiveSubscriber; - - if (exclusiveSub != null) - { - exclusiveSub.getSendLock(); - - try - { - entry = _entries.add(message); - - deliverToSubscription(exclusiveSub, entry); - } - finally - { - exclusiveSub.releaseSendLock(); - } - } - else - { - entry = _entries.add(message); - /* - - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - - */ - SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); - SubscriptionList.SubscriptionNode nextNode = node.getNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } - while (nextNode != null) - { - if (_lastSubscriptionNode.compareAndSet(node, nextNode)) - { - break; - } - else - { - node = _lastSubscriptionNode.get(); - nextNode = node.getNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } - } - } - - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; - - while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) - { - if (nextNode == null) - { - loops--; - nextNode = _subscriptionList.getHead(); - } - else - { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); - } - nextNode = nextNode.getNext(); - - } - } - - - if (!(entry.isAcquired() || entry.isDeleted())) - { - checkSubscriptionsNotAheadOfDelivery(entry); - - deliverAsync(); - } - - if(_managedObject != null) - { - _managedObject.checkForNotification(entry.getMessage()); - } - - if(action != null) - { - action.onEnqueue(entry); - } - - } - - private void deliverToSubscription(final Subscription sub, final QueueEntry entry) - throws AMQException - { - - sub.getSendLock(); - try - { - if (subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended()) - { - if (!sub.wouldSuspend(entry)) - { - if (sub.acquires() && !entry.acquire(sub)) - { - // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription - sub.onDequeue(entry); - } - else - { - deliverMessage(sub, entry); - } - } - } - } - finally - { - sub.releaseSendLock(); - } - } - - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) - { - // This method is only required for queues which mess with ordering - // Simple Queues don't :-) - } - - private void incrementQueueSize(final ServerMessage message) - { - long size = message.getSize(); - getAtomicQueueSize().addAndGet(size); - _enqueueSize.addAndGet(size); - if(message.isPersistent() && isDurable()) - { - _persistentMessageEnqueueSize.addAndGet(size); - _persistentMessageEnqueueCount.incrementAndGet(); - } - } - - private void incrementQueueCount() - { - getAtomicQueueCount().incrementAndGet(); - } - - private void incrementTxnEnqueueStats(final ServerMessage message) - { - SessionConfig session = message.getSessionConfig(); - - if(session !=null && session.isTransactional()) - { - _msgTxnEnqueues.incrementAndGet(); - _byteTxnEnqueues.addAndGet(message.getSize()); - } - } - - private void incrementTxnDequeueStats(QueueEntry entry) - { - _msgTxnDequeues.incrementAndGet(); - _byteTxnDequeues.addAndGet(entry.getSize()); - } - - private void deliverMessage(final Subscription sub, final QueueEntry entry) - throws AMQException - { - setLastSeenEntry(sub, entry); - - _deliveredMessages.incrementAndGet(); - incrementUnackedMsgCount(); - - sub.send(entry); - } - - private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException - { - return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); - } - - - private void setLastSeenEntry(final Subscription sub, final QueueEntry entry) - { - QueueContext subContext = (QueueContext) sub.getQueueContext(); - QueueEntry releasedEntry = subContext._releasedEntry; - - QueueContext._lastSeenUpdater.set(subContext, entry); - if(releasedEntry == entry) - { - QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null); - } - } - - private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry) - { - - QueueContext subContext = (QueueContext) sub.getQueueContext(); - if(subContext != null) - { - QueueEntry oldEntry; - - while((oldEntry = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0) - { - if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry)) - { - break; - } - } - } - } - - public void requeue(QueueEntry entry) - { - - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards - while (subscriberIter.advance() && entry.isAvailable()) - { - Subscription sub = subscriberIter.getNode().getSubscription(); - - // we don't make browsers send the same stuff twice - if (sub.seesRequeues()) - { - updateSubRequeueEntry(sub, entry); - } - } - - deliverAsync(); - - } - - public void dequeue(QueueEntry entry, Subscription sub) - { - decrementQueueCount(); - decrementQueueSize(entry); - if (entry.acquiredBySubscription()) - { - _deliveredMessages.decrementAndGet(); - } - - if(sub != null && sub.isSessionTransactional()) - { - incrementTxnDequeueStats(entry); - } - - checkCapacity(); - - } - - private void decrementQueueSize(final QueueEntry entry) - { - final ServerMessage message = entry.getMessage(); - long size = message.getSize(); - getAtomicQueueSize().addAndGet(-size); - _dequeueSize.addAndGet(size); - if(message.isPersistent() && isDurable()) - { - _persistentMessageDequeueSize.addAndGet(size); - _persistentMessageDequeueCount.incrementAndGet(); - } - } - - void decrementQueueCount() - { - getAtomicQueueCount().decrementAndGet(); - _dequeueCount.incrementAndGet(); - } - - public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException - { - /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message - entry to resend and move back the subscription pointer. */ - - subscription.getSendLock(); - try - { - if (!subscription.isClosed()) - { - deliverMessage(subscription, entry); - return true; - } - else - { - return false; - } - } - finally - { - subscription.releaseSendLock(); - } - } - - public int getConsumerCount() - { - return _subscriptionList.size(); - } - - public int getConsumerCountHigh() - { - return _counsumerCountHigh.get(); - } - - public int getActiveConsumerCount() - { - return _activeSubscriberCount.get(); - } - - public boolean isUnused() - { - return getConsumerCount() == 0; - } - - public boolean isEmpty() - { - return getMessageCount() == 0; - } - - public int getMessageCount() - { - return getAtomicQueueCount().get(); - } - - public long getQueueDepth() - { - return getAtomicQueueSize().get(); - } - - public int getUndeliveredMessageCount() - { - int count = getMessageCount() - _deliveredMessages.get(); - if (count < 0) - { - return 0; - } - else - { - return count; - } - } - - public long getReceivedMessageCount() - { - return _totalMessagesReceived.get(); - } - - public long getOldestMessageArrivalTime() - { - QueueEntry entry = getOldestQueueEntry(); - return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); - } - - protected QueueEntry getOldestQueueEntry() - { - return _entries.next(_entries.getHead()); - } - - public boolean isDeleted() - { - return _deleted.get(); - } - - public List<QueueEntry> getMessagesOnTheQueue() - { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (node != null && !node.isDeleted()) - { - entryList.add(node); - } - } - return entryList; - - } - - public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) - { - if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) - { - _activeSubscriberCount.decrementAndGet(); - - } - else if (newState == Subscription.State.ACTIVE) - { - if (oldState != Subscription.State.ACTIVE) - { - _activeSubscriberCount.incrementAndGet(); - - } - deliverAsync(sub); - } - } - - public int compareTo(final AMQQueue o) - { - return _name.compareTo(o.getNameShortString()); - } - - public AtomicInteger getAtomicQueueCount() - { - return _atomicQueueCount; - } - - public AtomicLong getAtomicQueueSize() - { - return _atomicQueueSize; - } - - public boolean hasExclusiveSubscriber() - { - return _exclusiveSubscriber != null; - } - - private void setExclusiveSubscriber(Subscription exclusiveSubscriber) - { - _exclusiveSubscriber = exclusiveSubscriber; - } - - public static interface QueueEntryFilter - { - public boolean accept(QueueEntry entry); - - public boolean filterComplete(); - } - - public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) - { - return getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageNumber(); - return messageId >= fromMessageId && messageId <= toMessageId; - } - - public boolean filterComplete() - { - return false; - } - }); - } - - public QueueEntry getMessageOnTheQueue(final long messageId) - { - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - private boolean _complete; - - public boolean accept(QueueEntry entry) - { - _complete = entry.getMessage().getMessageNumber() == messageId; - return _complete; - } - - public boolean filterComplete() - { - return _complete; - } - }); - return entries.isEmpty() ? null : entries.get(0); - } - - public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter) - { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); - while (queueListIterator.advance() && !filter.filterComplete()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && filter.accept(node)) - { - entryList.add(node); - } - } - return entryList; - - } - - /** - * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. - * - * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. - * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. - * @param fromPosition - * @param toPosition - * @return - */ - public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition) - { - return getMessagesOnTheQueue(new QueueEntryFilter() - { - private long position = 0; - - public boolean accept(QueueEntry entry) - { - position++; - return (position >= fromPosition) && (position <= toPosition); - } - - public boolean filterComplete() - { - return position >= toPosition; - } - }); - - } - - public void moveMessagesToAnotherQueue(final long fromMessageId, - final long toMessageId, - String queueName, - ServerTransaction txn) throws IllegalArgumentException - { - - final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (toQueue == null) - { - throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); - } - else if (toQueue == this) - { - throw new IllegalArgumentException("The destination queue cant be the same as the source queue"); - } - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageNumber(); - return (messageId >= fromMessageId) - && (messageId <= toMessageId) - && entry.acquire(); - } - - public boolean filterComplete() - { - return false; - } - }); - - - - // Move the messages in on the message store. - for (final QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - txn.enqueue(toQueue, message, - new ServerTransaction.Action() - { - - public void postCommit() - { - try - { - toQueue.enqueue(message); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - entry.release(); - } - }); - txn.dequeue(this, message, - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { - - } - }); - - } - - } - - public void copyMessagesToAnotherQueue(final long fromMessageId, - final long toMessageId, - String queueName, - final ServerTransaction txn) throws IllegalArgumentException - { - final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (toQueue == null) - { - throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); - } - else if (toQueue == this) - { - throw new IllegalArgumentException("The destination queue cant be the same as the source queue"); - } - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessage().getMessageNumber(); - return ((messageId >= fromMessageId) - && (messageId <= toMessageId)); - } - - public boolean filterComplete() - { - return false; - } - }); - - - // Move the messages in on the message store. - for (QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - - txn.enqueue(toQueue, message, new ServerTransaction.Action() - { - public void postCommit() - { - try - { - toQueue.enqueue(message); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - - } - }); - - } - - } - - public void removeMessagesFromQueue(long fromMessageId, long toMessageId) - { - - QueueEntryIterator queueListIterator = _entries.iterator(); - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - - final ServerMessage message = node.getMessage(); - if(message != null) - { - final long messageId = message.getMessageNumber(); - - if ((messageId >= fromMessageId) - && (messageId <= toMessageId) - && !node.isDeleted() - && node.acquire()) - { - dequeueEntry(node); - } - } - } - - } - - public void purge(final long request) throws AMQException - { - clear(request); - } - - public long getCreateTime() - { - return _createTime; - } - - // ------ Management functions - - public void deleteMessageFromTop() - { - QueueEntryIterator queueListIterator = _entries.iterator(); - boolean noDeletes = true; - - while (noDeletes && queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) - { - dequeueEntry(node); - noDeletes = false; - } - - } - } - - public long clearQueue() throws AMQException - { - return clear(0l); - } - - private long clear(final long request) throws AMQSecurityException - { - //Perform ACLs - if (!getVirtualHost().getSecurityManager().authorisePurge(this)) - { - throw new AMQSecurityException("Permission denied: queue " + getName()); - } - - QueueEntryIterator queueListIterator = _entries.iterator(); - long count = 0; - - ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) - { - dequeueEntry(node, txn); - if(++count == request) - { - break; - } - } - - } - - txn.commit(); - - return count; - } - - private void dequeueEntry(final QueueEntry node) - { - ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog()); - dequeueEntry(node, txn); - } - - private void dequeueEntry(final QueueEntry node, ServerTransaction txn) - { - txn.dequeue(this, node.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - node.discard(); - } - - public void onRollback() - { - - } - }); - } - - public void addQueueDeleteTask(final Task task) - { - _deleteTaskList.add(task); - } - - public void removeQueueDeleteTask(final Task task) - { - _deleteTaskList.remove(task); - } - - // TODO list all thrown exceptions - public int delete() throws AMQSecurityException, AMQException - { - // Check access - if (!_virtualHost.getSecurityManager().authoriseDelete(this)) - { - throw new AMQSecurityException("Permission denied: " + getName()); - } - - if (!_deleted.getAndSet(true)) - { - - for (Binding b : getBindings()) - { - _virtualHost.getBindingFactory().removeBinding(b); - } - - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); - - while (subscriptionIter.advance()) - { - Subscription s = subscriptionIter.getNode().getSubscription(); - if (s != null) - { - s.queueDeleted(this); - } - } - - _virtualHost.getQueueRegistry().unregisterQueue(_name); - getConfigStore().removeConfiguredObject(this); - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - return entry.acquire(); - } - - public boolean filterComplete() - { - return false; - } - }); - - ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); - - if(_alternateExchange != null) - { - - InboundMessageAdapter adapter = new InboundMessageAdapter(); - for(final QueueEntry entry : entries) - { - adapter.setEntry(entry); - final List<? extends BaseQueue> rerouteQueues = _alternateExchange.route(adapter); - final ServerMessage message = entry.getMessage(); - if(rerouteQueues != null && rerouteQueues.size() != 0) - { - txn.enqueue(rerouteQueues, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - try - { - for(BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - - public void onRollback() - { - - } - }); - txn.dequeue(this, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { - } - }); - } - - } - - _alternateExchange.removeReference(this); - } - else - { - // TODO log discard - - for(final QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - if(message != null) - { - txn.dequeue(this, message, - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { - } - }); - } - } - } - - txn.commit(); - - - if(_managedObject!=null) - { - _managedObject.unregister(); - } - - for (Task task : _deleteTaskList) - { - task.doTask(this); - } - - _deleteTaskList.clear(); - stop(); - - //Log Queue Deletion - CurrentActor.get().message(_logSubject, QueueMessages.DELETED()); - - } - return getMessageCount(); - - } - - public void stop() - { - if (!_stopped.getAndSet(true)) - { - ReferenceCountingExecutorService.getInstance().releaseExecutorService(); - } - } - - public void checkCapacity(AMQChannel channel) - { - if(_capacity != 0l) - { - if(_atomicQueueSize.get() > _capacity) - { - _overfull.set(true); - //Overfull log message - _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); - - if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) - { - channel.block(this); - } - - if(_atomicQueueSize.get() <= _flowResumeCapacity) - { - - //Underfull log message - _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); - - channel.unblock(this); - _blockedChannels.remove(channel); - - } - - } - - - - } - } - - private void checkCapacity() - { - if(_capacity != 0L) - { - if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) - { - if(_overfull.compareAndSet(true,false)) - {//Underfull log message - _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); - } - - - for(AMQChannel c : _blockedChannels.keySet()) - { - c.unblock(this); - _blockedChannels.remove(c); - } - } - } - } - - - public void deliverAsync() - { - QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); - - if (_asynchronousRunner.compareAndSet(null, runner)) - { - _asyncDelivery.execute(runner); - } - } - - public void deliverAsync(Subscription sub) - { - SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER); - if(flusher == null) - { - flusher = new SubFlushRunner(sub); - sub.set(SUB_FLUSH_RUNNER, flusher); - } - _asyncDelivery.execute(flusher); - } - - public void flushSubscription(Subscription sub) throws AMQException - { - // Access control - if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) - { - throw new AMQSecurityException("Permission denied: " + getName()); - } - flushSubscription(sub, Long.MAX_VALUE); - } - - public boolean flushSubscription(Subscription sub, long iterations) throws AMQException - { - boolean atTail = false; - - while (!sub.isSuspended() && !atTail && iterations != 0) - { - try - { - sub.getSendLock(); - atTail = attemptDelivery(sub); - if (atTail && sub.isAutoClose()) - { - unregisterSubscription(sub); - - sub.confirmAutoClose(); - - } - else if (!atTail) - { - iterations--; - } - } - finally - { - sub.releaseSendLock(); - } - } - - // if there's (potentially) more than one subscription the others will potentially not have been advanced to the - // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc - // which would give us memory "leak". - - if (!hasExclusiveSubscriber()) - { - advanceAllSubscriptions(); - } - return atTail; - } - - /** - * Attempt delivery for the given subscription. - * - * Looks up the next node for the subscription and attempts to deliver it. - * - * @param sub - * @return true if we have completed all possible deliveries for this sub. - * @throws AMQException - */ - private boolean attemptDelivery(Subscription sub) throws AMQException - { - boolean atTail = false; - - boolean subActive = sub.isActive() && !sub.isSuspended(); - if (subActive) - { - - QueueEntry node = getNextAvailableEntry(sub); - - if (node != null && !(node.isAcquired() || node.isDeleted())) - { - if (sub.hasInterest(node)) - { - if (!sub.wouldSuspend(node)) - { - if (sub.acquires() && !node.acquire(sub)) - { - sub.onDequeue(node); - } - else - { - deliverMessage(sub, node); - } - - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - node.addStateChangeListener(new QueueEntryListener(sub)); - } - } - - } - atTail = (node == null) || (_entries.next(node) == null); - } - return atTail || !subActive; - } - - protected void advanceAllSubscriptions() throws AMQException - { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - while (subscriberIter.advance()) - { - SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); - Subscription sub = subNode.getSubscription(); - if(sub.acquires()) - { - getNextAvailableEntry(sub); - } - else - { - // TODO - } - } - } - - private QueueEntry getNextAvailableEntry(final Subscription sub) - throws AMQException - { - QueueContext context = (QueueContext) sub.getQueueContext(); - if(context != null) - { - QueueEntry lastSeen = context._lastSeenEntry; - QueueEntry releasedNode = context._releasedEntry; - - QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); - - boolean expired = false; - while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node))) - { - if (expired) - { - expired = false; - if (node.acquire()) - { - dequeueEntry(node); - } - } - - if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node)) - { - QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null); - } - - lastSeen = context._lastSeenEntry; - releasedNode = context._releasedEntry; - node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen); - } - return node; - } - else - { - return null; - } - } - - - /** - * Used by queue Runners to asynchronously deliver messages to consumers. - * - * A queue Runner is started whenever a state change occurs, e.g when a new - * message arrives on the queue and cannot be immediately delivered to a - * subscription (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to subscriptions unsuspending) which are - * capable of accepting/delivering all messages then these messages would - * otherwise remain on the queue. - * - * processQueue should be running while there are messages on the queue AND - * there are subscriptions that can deliver them. If there are no - * subscriptions capable of delivering the remaining messages on the queue - * then processQueue should stop to prevent spinning. - * - * Since processQueue is runs in a fixed size Executor, it should not run - * indefinitely to prevent starving other tasks of CPU (e.g jobs to process - * incoming messages may not be able to be scheduled in the thread pool - * because all threads are working on clearing down large queues). To solve - * this problem, after an arbitrary number of message deliveries the - * processQueue job stops iterating, resubmits itself to the executor, and - * ends the current instance - * - * @param runner the Runner to schedule - * @throws AMQException - */ - public void processQueue(QueueRunner runner) throws AMQException - { - long stateChangeCount; - long previousStateChangeCount = Long.MIN_VALUE; - boolean deliveryIncomplete = true; - - boolean lastLoop = false; - int iterations = MAX_ASYNC_DELIVERIES; - - _asynchronousRunner.compareAndSet(runner, null); - - // For every message enqueue/requeue the we fire deliveryAsync() which - // increases _stateChangeCount. If _sCC changes whilst we are in our loop - // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) - // then we will continue to run for a maximum of iterations. - // So whilst delivery/rejection is going on a processQueue thread will be running - while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) - { - // we want to have one extra loop after every subscription has reached the point where it cannot move - // further, just in case the advance of one subscription in the last loop allows a different subscription to - // move forward in the next iteration - - if (previousStateChangeCount != stateChangeCount) - { - //further asynchronous delivery is required since the - //previous loop. keep going if iteration slicing allows. - lastLoop = false; - } - - previousStateChangeCount = stateChangeCount; - boolean allSubscriptionsDone = true; - boolean subscriptionDone; - - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); - //iterate over the subscribers and try to advance their pointer - while (subscriptionIter.advance()) - { - Subscription sub = subscriptionIter.getNode().getSubscription(); - sub.getSendLock(); - try - { - //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub); - if (subscriptionDone) - { - //close autoClose subscriptions if we are not currently intent on continuing - if (lastLoop && sub.isAutoClose()) - { - unregisterSubscription(sub); - - sub.confirmAutoClose(); - } - } - else - { - //this subscription can accept additional deliveries, so we must - //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; - lastLoop = false; - iterations--; - } - } - finally - { - sub.releaseSendLock(); - } - } - - if(allSubscriptionsDone && lastLoop) - { - //We have done an extra loop already and there are again - //again no further delivery attempts possible, only - //keep going if state change demands it. - deliveryIncomplete = false; - } - else if(allSubscriptionsDone) - { - //All subscriptions reported being done, but we have to do - //an extra loop if the iterations are not exhausted and - //there is still any work to be done - deliveryIncomplete = _subscriptionList.size() != 0; - lastLoop = true; - } - else - { - //some subscriptions can still accept more messages, - //keep going if iteration count allows. - lastLoop = false; - deliveryIncomplete = true; - } - - _asynchronousRunner.set(null); - } - - // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit - // therefore we should schedule this runner again (unless someone beats us to it :-) ). - if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Rescheduling runner:" + runner); - } - _asyncDelivery.execute(runner); - } - } - - public void checkMessageStatus() throws AMQException - { - - QueueEntryIterator queueListIterator = _entries.iterator(); - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - // Only process nodes that are not currently deleted - if (!node.isDeleted()) - { - // If the node has exired then aquire it - if (node.expired() && node.acquire()) - { - // Then dequeue it. - dequeueEntry(node); - } - else - { - if (_managedObject != null) - { - // There is a chance that the node could be deleted by - // the time the check actually occurs. So verify we - // can actually get the message to perform the check. - ServerMessage msg = node.getMessage(); - if (msg != null) - { - _managedObject.checkForNotification(msg); - } - } - } - } - } - - } - - public long getMinimumAlertRepeatGap() - { - return _minimumAlertRepeatGap; - } - - public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) - { - _minimumAlertRepeatGap = minimumAlertRepeatGap; - } - - public long getMaximumMessageAge() - { - return _maximumMessageAge; - } - - public void setMaximumMessageAge(long maximumMessageAge) - { - _maximumMessageAge = maximumMessageAge; - if (maximumMessageAge == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); - } - } - - public long getMaximumMessageCount() - { - return _maximumMessageCount; - } - - public void setMaximumMessageCount(final long maximumMessageCount) - { - _maximumMessageCount = maximumMessageCount; - if (maximumMessageCount == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); - } - - } - - public long getMaximumQueueDepth() - { - return _maximumQueueDepth; - } - - // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(final long maximumQueueDepth) - { - _maximumQueueDepth = maximumQueueDepth; - if (maximumQueueDepth == 0L) - { - _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); - } - - } - - public long getMaximumMessageSize() - { - return _maximumMessageSize; - } - - public void setMaximumMessageSize(final long maximumMessageSize) - { - _maximumMessageSize = maximumMessageSize; - if (maximumMessageSize == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); - } - } - - public long getCapacity() - { - return _capacity; - } - - public void setCapacity(long capacity) - { - _capacity = capacity; - } - - public long getFlowResumeCapacity() - { - return _flowResumeCapacity; - } - - public void setFlowResumeCapacity(long flowResumeCapacity) - { - _flowResumeCapacity = flowResumeCapacity; - - checkCapacity(); - } - - public boolean isOverfull() - { - return _overfull.get(); - } - - public Set<NotificationCheck> getNotificationChecks() - { - return _notificationChecks; - } - - public ManagedObject getManagedObject() - { - return _managedObject; - } - - private final class QueueEntryListener implements QueueEntry.StateChangeListener - { - - private final Subscription _sub; - - public QueueEntryListener(final Subscription sub) - { - _sub = sub; - } - - public boolean equals(Object o) - { - assert o != null; - assert o instanceof QueueEntryListener; - return _sub == ((QueueEntryListener) o)._sub; - } - - public int hashCode() - { - return System.identityHashCode(_sub); - } - - public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) - { - entry.removeStateChangeListener(this); - deliverAsync(_sub); - } - } - - public List<Long> getMessagesOnTheQueue(int num) - { - return getMessagesOnTheQueue(num, 0); - } - - public List<Long> getMessagesOnTheQueue(int num, int offset) - { - ArrayList<Long> ids = new ArrayList<Long>(num); - QueueEntryIterator it = _entries.iterator(); - for (int i = 0; i < offset; i++) - { - it.advance(); - } - - for (int i = 0; i < num && !it.atTail(); i++) - { - it.advance(); - ids.add(it.getNode().getMessage().getMessageNumber()); - } - return ids; - } - - public AMQSessionModel getExclusiveOwningSession() - { - return _exclusiveOwner; - } - - public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) - { - _exclusive = true; - _exclusiveOwner = exclusiveOwner; - } - - - public void configure(ConfigurationPlugin config) - { - if (config != null) - { - if (config instanceof QueueConfiguration) - { - - setMaximumMessageAge(((QueueConfiguration)config).getMaximumMessageAge()); - setMaximumQueueDepth(((QueueConfiguration)config).getMaximumQueueDepth()); - setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize()); - setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount()); - setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap()); - _capacity = ((QueueConfiguration)config).getCapacity(); - _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity(); - } - - _queueConfiguration = config; - - } - } - - - public ConfigurationPlugin getConfiguration() - { - return _queueConfiguration; - } - - public String getResourceName() - { - return _resourceName; - } - - - public ConfigStore getConfigStore() - { - return getVirtualHost().getConfigStore(); - } - - public long getMessageDequeueCount() - { - return _dequeueCount.get(); - } - - public long getTotalEnqueueSize() - { - return _enqueueSize.get(); - } - - public long getTotalDequeueSize() - { - return _dequeueSize.get(); - } - - public long getByteTxnEnqueues() - { - return _byteTxnEnqueues.get(); - } - - public long getByteTxnDequeues() - { - return _byteTxnDequeues.get(); - } - - public long getMsgTxnEnqueues() - { - return _msgTxnEnqueues.get(); - } - - public long getMsgTxnDequeues() - { - return _msgTxnDequeues.get(); - } - - public long getPersistentByteEnqueues() - { - return _persistentMessageEnqueueSize.get(); - } - - public long getPersistentByteDequeues() - { - return _persistentMessageDequeueSize.get(); - } - - public long getPersistentMsgEnqueues() - { - return _persistentMessageEnqueueCount.get(); - } - - public long getPersistentMsgDequeues() - { - return _persistentMessageDequeueCount.get(); - } - - - @Override - public String toString() - { - return String.valueOf(getNameShortString()); - } - - public long getUnackedMessageCountHigh() - { - return _unackedMsgCountHigh.get(); - } - - public long getUnackedMessageCount() - { - return _unackedMsgCount.get(); - } - - public void decrementUnackedMsgCount() - { - _unackedMsgCount.decrementAndGet(); - } - - private void incrementUnackedMsgCount() - { - long unackedMsgCount = _unackedMsgCount.incrementAndGet(); - - long unackedMsgCountHigh; - while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get())) - { - if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount)) - { - break; - } - } - } - - public LogActor getLogActor() - { - return _logActor; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java deleted file mode 100644 index b97c2c55c5..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ /dev/null @@ -1,198 +0,0 @@ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.ServerMessage; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public class SimpleQueueEntryList implements QueueEntryList -{ - - private final QueueEntryImpl _head; - - private volatile QueueEntryImpl _tail; - - static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl> - _tailUpdater = - AtomicReferenceFieldUpdater.newUpdater - (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail"); - - - private final AMQQueue _queue; - - static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl> - _nextUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); - - private AtomicLong _scavenges = new AtomicLong(0L); - private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50); - - - public SimpleQueueEntryList(AMQQueue queue) - { - _queue = queue; - _head = new QueueEntryImpl(this); - _tail = _head; - } - - void advanceHead() - { - QueueEntryImpl next = _head.nextNode(); - QueueEntryImpl newNext = _head.getNext(); - - if (next == newNext) - { - if (_scavenges.incrementAndGet() > _scavengeCount) - { - _scavenges.set(0L); - scavenge(); - } - } - } - - void scavenge() - { - QueueEntryImpl next = _head.getNext(); - - while (next != null) - { - next = next.getNext(); - } - } - - - public AMQQueue getQueue() - { - return _queue; - } - - - public QueueEntry add(ServerMessage message) - { - QueueEntryImpl node = createQueueEntry(message); - for (;;) - { - QueueEntryImpl tail = _tail; - QueueEntryImpl next = tail.nextNode(); - if (tail == _tail) - { - if (next == null) - { - node.setEntryId(tail.getEntryId()+1); - if (_nextUpdater.compareAndSet(tail, null, node)) - { - _tailUpdater.compareAndSet(this, tail, node); - - return node; - } - } - else - { - _tailUpdater.compareAndSet(this,tail, next); - } - } - } - } - - protected QueueEntryImpl createQueueEntry(ServerMessage message) - { - return new QueueEntryImpl(this, message); - } - - public QueueEntry next(QueueEntry node) - { - return ((QueueEntryImpl)node).getNext(); - } - - - public static class QueueEntryIteratorImpl implements QueueEntryIterator - { - - private QueueEntryImpl _lastNode; - - QueueEntryIteratorImpl(QueueEntryImpl startNode) - { - _lastNode = startNode; - } - - - public boolean atTail() - { - return _lastNode.nextNode() == null; - } - - public QueueEntry getNode() - { - - return _lastNode; - - } - - public boolean advance() - { - - if(!atTail()) - { - QueueEntryImpl nextNode = _lastNode.nextNode(); - while(nextNode.isDeleted() && nextNode.nextNode() != null) - { - nextNode = nextNode.nextNode(); - } - _lastNode = nextNode; - return true; - - } - else - { - return false; - } - - } - - } - - - public QueueEntryIterator iterator() - { - return new QueueEntryIteratorImpl(_head); - } - - - public QueueEntry getHead() - { - return _head; - } - - static class Factory implements QueueEntryListFactory - { - - public QueueEntryList createQueueEntryList(AMQQueue queue) - { - return new SimpleQueueEntryList(queue); - } - } - - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java deleted file mode 100755 index 46c1a6af9a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ /dev/null @@ -1,96 +0,0 @@ -package org.apache.qpid.server.queue; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import org.apache.qpid.pool.ReadWriteRunnable; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; - - -class SubFlushRunner implements ReadWriteRunnable -{ - private static final Logger _logger = Logger.getLogger(SubFlushRunner.class); - - - private final Subscription _sub; - private final String _name; - private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES; - - public SubFlushRunner(Subscription sub) - { - _sub = sub; - _name = "SubFlushRunner-"+_sub; - } - - public void run() - { - - String originalName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName(_name); - - boolean complete = false; - try - { - CurrentActor.set(_sub.getLogActor()); - complete = getQueue().flushSubscription(_sub, ITERATIONS); - - } - catch (AMQException e) - { - _logger.error(e); - } - finally - { - CurrentActor.remove(); - } - if (!complete && !_sub.isSuspended()) - { - getQueue().execute(this); - } - - } - finally - { - Thread.currentThread().setName(originalName); - } - - } - - private SimpleAMQQueue getQueue() - { - return (SimpleAMQQueue) _sub.getQueue(); - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } -} |