diff options
author | Martin Ritchie <ritchiem@apache.org> | 2009-04-14 15:46:39 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-14 15:46:39 +0000 |
commit | 1b2b4b309e9392e0523cd62accb8704fd089eef8 (patch) | |
tree | 81800138f1c90a88d004b7fad90a69a902ab44dd /java/broker/src/main/java/org/apache/qpid/server/queue | |
parent | 86f2fa80575d0db1fe1395172f6dccf23c7d5018 (diff) | |
download | qpid-python-1b2b4b309e9392e0523cd62accb8704fd089eef8.tar.gz |
QPID-1807 : Remove old broker and FlowToDisk related tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@764838 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue')
38 files changed, 0 insertions, 7280 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java deleted file mode 100644 index 8dac12fe24..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.AMQException; - -import java.util.Iterator; - -public interface AMQMessage -{ - //Get Content relating to this message - - Long getMessageId(); - - Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel); - - Iterator<ContentChunk> getContentBodyIterator(); - - ContentHeaderBody getContentHeaderBody(); - - ContentChunk getContentChunk(int index); - - Object getPublisherClientInstance(); - - Object getPublisherIdentifier(); - - MessagePublishInfo getMessagePublishInfo(); - - int getBodyCount(); - - long getSize(); - - long getArrivalTime(); - - - - //Check the status of this message - - /** Is this a persistent message - * - * @return true if the message is persistent - */ - boolean isPersistent(); - - - boolean isImmediate(); - - - void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier); - - /** - * This is called when all the content has been received. - * @param storeContext - *@param messagePublishInfo - * @param contentHeaderBody @throws org.apache.qpid.AMQException - */ - void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody) - throws AMQException; - - void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) - throws AMQException; - - void recoverFromMessageMetaData(MessageMetaData mmd); - - void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException; - - - String toString(); - - String debugIdentity(); - - void setExpiration(long expiration); - - long getExpiration(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java deleted file mode 100644 index 00dec57ed5..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ /dev/null @@ -1,77 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.subscription.SubscriptionList; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.AMQException; - -public class AMQPriorityQueue extends SimpleAMQQueue -{ - protected AMQPriorityQueue(final AMQShortString name, - final boolean durable, - final AMQShortString owner, - final boolean autoDelete, - final VirtualHost virtualHost, - int priorities) - throws AMQException - { - super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueEntryList.Factory(priorities)); - } - - public int getPriorities() - { - return ((PriorityQueueEntryList) _entries).getPriorities(); - } - - @Override - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) - { - // check that all subscriptions are not in advance of the entry - SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); - while(subIter.advance() && !entry.isAcquired()) - { - final Subscription subscription = subIter.getNode().getSubscription(); - QueueEntry subnode = subscription.getLastSeenEntry(); - while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired()) - { - if(subscription.setLastSeenEntry(subnode,entry)) - { - break; - } - else - { - subnode = subscription.getLastSeenEntry(); - } - } - - } - } - - @Override - public String getType() - { - return getClass().getSimpleName() + "[" + getName() + "][Priorities:" + getPriorities() + "]"; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java deleted file mode 100644 index fae219e320..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; - -import java.util.List; -import java.util.Set; - -public interface AMQQueue extends Managable, Comparable<AMQQueue> -{ - - AMQShortString getName(); - - boolean isDurable(); - - boolean isAutoDelete(); - - AMQShortString getOwner(); - - VirtualHost getVirtualHost(); - - - void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException; - - void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException; - - List<ExchangeBinding> getExchangeBindings(); - - - void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException; - - void unregisterSubscription(final Subscription subscription) throws AMQException; - - - int getConsumerCount(); - - int getActiveConsumerCount(); - - boolean isUnused(); - - boolean isEmpty(); - - boolean isFlowed(); - - int getMessageCount(); - - int getUndeliveredMessageCount(); - - - long getQueueDepth(); - - long getReceivedMessageCount(); - - long getOldestMessageArrivalTime(); - - boolean isDeleted(); - - int delete() throws AMQException; - - QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; - - void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; - - void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; - - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - - void addQueueDeleteTask(final Task task); - - List<QueueEntry> getMessagesOnTheQueue(); - - List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId); - - List<Long> getMessagesOnTheQueue(int num); - - List<Long> getMessagesOnTheQueue(int num, int offest); - - QueueEntry getMessageOnTheQueue(long messageId); - - - void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, - StoreContext storeContext); - - void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext); - - void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext); - - long getMemoryUsageMaximum(); - - void setMemoryUsageMaximum(long maximumMemoryUsage); - - long getMemoryUsageMinimum(); - - void setMemoryUsageMinimum(long minimumMemoryUsage); - - long getMemoryUsageCurrent(); - - long getMaximumMessageSize(); - - void setMaximumMessageSize(long value); - - - long getMaximumMessageCount(); - - void setMaximumMessageCount(long value); - - - long getMaximumQueueDepth(); - - void setMaximumQueueDepth(long value); - - - long getMaximumMessageAge(); - - void setMaximumMessageAge(final long maximumMessageAge); - - - long getMinimumAlertRepeatGap(); - - void setMinimumAlertRepeatGap(long value); - - - void deleteMessageFromTop(StoreContext storeContext) throws AMQException; - - long clearQueue(StoreContext storeContext) throws AMQException; - - /** - * Checks the status of messages on the queue, purging expired ones, firing age related alerts etc. - * @throws AMQException - */ - void checkMessageStatus() throws AMQException; - - Set<NotificationCheck> getNotificationChecks(); - - void flushSubscription(final Subscription sub) throws AMQException; - - void deliverAsync(final Subscription sub); - - void deliverAsync(); - - void stop(); - - /** - * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - static final class ExistingExclusiveSubscription extends AMQException - { - - public ExistingExclusiveSubscription() - { - super(""); - } - } - - /** - * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - static final class ExistingSubscriptionPreventsExclusive extends AMQException - { - public ExistingSubscriptionPreventsExclusive() - { - super(""); - } - } - - static interface Task - { - public void doTask(AMQQueue queue) throws AMQException; - } - - void configure(QueueConfiguration config); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java deleted file mode 100644 index b77a9d8f6a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class AMQQueueFactory -{ - public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); - public static final AMQShortString QPID_MAX_COUNT = new AMQShortString("qpid.max_count"); - public static final AMQShortString QPID_MAX_SIZE = new AMQShortString("qpid.max_size"); - public static final AMQShortString QPID_POLICY_TYPE = new AMQShortString("qpid.policy_type"); - public static final String QPID_FLOW_TO_DISK = "flow_to_disk"; - - public static AMQQueue createAMQQueueImpl(AMQShortString name, - boolean durable, - AMQShortString owner, - boolean autoDelete, - VirtualHost virtualHost, final FieldTable arguments) - throws AMQException - { - - int priorities = 1; - - if (arguments != null && arguments.containsKey(X_QPID_PRIORITIES)) - { - Integer priority = arguments.getInteger(X_QPID_PRIORITIES); - - if (priority != null) - { - priorities = priority.intValue(); - } - else - { - throw new AMQException(AMQConstant.INVALID_ARGUMENT, - "Queue create request with non integer value for :" + X_QPID_PRIORITIES + "=" + arguments.get(X_QPID_PRIORITIES), null); - } - - } - - AMQQueue q = null; - if (priorities > 1) - { - q = new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities); - } - else - { - q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost); - } - - final String queuePolicyType = arguments == null ? null : - arguments.containsKey(QPID_POLICY_TYPE) ? arguments.getString(QPID_POLICY_TYPE) : null; - - if (queuePolicyType != null) - { - if (queuePolicyType.equals(QPID_FLOW_TO_DISK)) - { - if (arguments.containsKey(QPID_MAX_SIZE)) - { - - final long queueSize = arguments.getInteger(QPID_MAX_SIZE); - - if (queueSize < 0) - { - throw new AMQException(AMQConstant.INVALID_ARGUMENT, - "Queue create request with negative size:" + queueSize, null); - } - - q.setMemoryUsageMaximum(queueSize); - } - else - { - throw new AMQException(AMQConstant.INVALID_ARGUMENT, - "Queue create request with no qpid.max_size value,", null); - } - } - else - { - throw new AMQException(AMQConstant.NOT_IMPLEMENTED, - "Queue create request with unknown Policy Type:" + queuePolicyType, null); - } - - } - - //Register the new queue - virtualHost.getQueueRegistry().registerQueue(q); - return q; - } - - public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException - { - AMQShortString queueName = new AMQShortString(config.getName()); - - boolean durable = config.getDurable(); - boolean autodelete = config.getAutoDelete(); - AMQShortString owner = (config.getOwner() != null) ? new AMQShortString(config.getOwner()) : null; - FieldTable arguments = null; - boolean priority = config.getPriority(); - int priorities = config.getPriorities(); - if (priority || priorities > 0) - { - if (arguments == null) - { - arguments = new FieldTable(); - } - if (priorities < 0) - { - priorities = 10; - } - arguments.put(new AMQShortString("x-qpid-priorities"), priorities); - } - - AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments); - q.configure(config); - return q; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java deleted file mode 100644 index b46d6b6f12..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ /dev/null @@ -1,494 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; - -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.CommonContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.store.StoreContext; - -import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.Notification; -import javax.management.OperationsException; -import javax.management.monitor.MonitorNotification; -import javax.management.openmbean.ArrayType; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - -import java.text.SimpleDateFormat; -import java.util.*; - -/** - * AMQQueueMBean is the management bean for an {@link AMQQueue}. - * - * <p/><tablse id="crc"><caption>CRC Caption</caption> - * <tr><th> Responsibilities <th> Collaborations - * </table> - */ -@MBeanDescription("Management Interface for AMQQueue") -public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener -{ - /** Used for debugging purposes. */ - private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); - - private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z"); - - /** - * Since the MBean is not associated with a real channel we can safely create our own store context - * for use in the few methods that require one. - */ - private StoreContext _storeContext = new StoreContext(); - - private AMQQueue _queue = null; - private String _queueName = null; - // OpenMBean data types for viewMessages method - private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" }; - private static String[] _msgAttributeIndex = { _msgAttributeNames[0] }; - private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. - private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. - private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. - - // OpenMBean data types for viewMessageContent method - private static CompositeType _msgContentType = null; - private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" }; - private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; - - private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; - private Notification _lastNotification = null; - - - - - @MBeanConstructor("Creates an MBean exposing an AMQQueue") - public AMQQueueMBean(AMQQueue queue) throws JMException - { - super(ManagedQueue.class, ManagedQueue.TYPE, ManagedQueue.VERSION); - _queue = queue; - _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString(); - } - - public ManagedObject getParentObject() - { - return _queue.getVirtualHost().getManagedObject(); - } - - static - { - try - { - init(); - } - catch (JMException ex) - { - // This is not expected to ever occur. - throw new RuntimeException("Got JMException in static initializer.", ex); - } - } - - /** - * initialises the openmbean data types - */ - private static void init() throws OpenDataException - { - _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id - _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType - _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding - _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content - _msgContentType = - new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes, - _msgContentAttributeTypes); - - _msgAttributeTypes[0] = SimpleType.LONG; // For message id - _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes - _msgAttributeTypes[2] = SimpleType.LONG; // For size - _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered - - _messageDataType = - new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); - _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex); - } - - public String getObjectInstanceName() - { - return _queueName; - } - - public String getName() - { - return _queueName; - } - - public boolean isDurable() - { - return _queue.isDurable(); - } - - public String getOwner() - { - return String.valueOf(_queue.getOwner()); - } - - public boolean isAutoDelete() - { - return _queue.isAutoDelete(); - } - - public Integer getMessageCount() - { - return _queue.getMessageCount(); - } - - public Long getMaximumMessageSize() - { - return _queue.getMaximumMessageSize(); - } - - public Long getMaximumMessageAge() - { - return _queue.getMaximumMessageAge(); - } - - public void setMaximumMessageAge(Long maximumMessageAge) - { - _queue.setMaximumMessageAge(maximumMessageAge); - } - - public void setMaximumMessageSize(Long value) - { - _queue.setMaximumMessageSize(value); - } - - public Integer getConsumerCount() - { - return _queue.getConsumerCount(); - } - - public Integer getActiveConsumerCount() - { - return _queue.getActiveConsumerCount(); - } - - public Long getReceivedMessageCount() - { - return _queue.getReceivedMessageCount(); - } - - public Long getMaximumMessageCount() - { - return _queue.getMaximumMessageCount(); - } - - public void setMaximumMessageCount(Long value) - { - _queue.setMaximumMessageCount(value); - } - - /** - * returns the maximum total size of messages(bytes) in the queue. - */ - public Long getMaximumQueueDepth() - { - return _queue.getMaximumQueueDepth(); - } - - public void setMaximumQueueDepth(Long value) - { - _queue.setMaximumQueueDepth(value); - } - - public Long getMemoryUsageMaximum() - { - return _queue.getMemoryUsageMaximum(); - } - - public void setMemoryUsageMaximum(Long maximumMemoryUsage) - { - _queue.setMemoryUsageMaximum(maximumMemoryUsage); - } - - public Long getMemoryUsageMinimum() - { - return _queue.getMemoryUsageMinimum(); - } - - public void setMemoryUsageMinimum(Long minimumMemoryUsage) - { - _queue.setMemoryUsageMinimum(minimumMemoryUsage); - } - - public Long getMemoryUsageCurrent() - { - return _queue.getMemoryUsageCurrent(); - } - - public boolean isFlowed() - { - return _queue.isFlowed(); - } - - /** - * returns the total size of messages(bytes) in the queue. - */ - public Long getQueueDepth() throws JMException - { - return _queue.getQueueDepth(); - } - - /** - * Checks if there is any notification to be send to the listeners - * @param queueEntry - */ - public void checkForNotification(QueueEntry queueEntry) throws AMQException - { - - final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); - - if(!notificationChecks.isEmpty()) - { - final long currentTime = System.currentTimeMillis(); - final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); - - for (NotificationCheck check : notificationChecks) - { - if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) - { - if (check.notifyIfNecessary(queueEntry, _queue, this)) - { - _lastNotificationTimes[check.ordinal()] = currentTime; - } - } - } - } - - } - - /** - * Sends the notification to the listeners - */ - public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg) - { - // important : add log to the log file - monitoring tools may be looking for this - _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg); - notificationMsg = notification.name() + " " + notificationMsg; - - _lastNotification = - new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, - System.currentTimeMillis(), notificationMsg); - - _broadcaster.sendNotification(_lastNotification); - } - - public Notification getLastNotification() - { - return _lastNotification; - } - - /** - * @see AMQQueue#deleteMessageFromTop - */ - public void deleteMessageFromTop() throws JMException - { - try - { - _queue.deleteMessageFromTop(_storeContext); - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - /** - * @see AMQQueue#clearQueue - */ - public void clearQueue() throws JMException - { - try - { - _queue.clearQueue(_storeContext); - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - /** - * returns message content as byte array and related attributes for the given message id. - */ - public CompositeData viewMessageContent(long msgId) throws JMException - { - QueueEntry entry = _queue.getMessageOnTheQueue(msgId); - - if (entry == null) - { - throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); - } - - AMQMessage msg = entry.getMessage(); - // get message content - Iterator<ContentChunk> cBodies = msg.getContentBodyIterator(); - List<Byte> msgContent = new ArrayList<Byte>(); - while (cBodies.hasNext()) - { - ContentChunk body = cBodies.next(); - if (body.getSize() != 0) - { - if (body.getSize() != 0) - { - ByteBuffer slice = body.getData().slice(); - for (int j = 0; j < slice.limit(); j++) - { - msgContent.add(slice.get()); - } - } - } - } - - // Create header attributes list - CommonContentHeaderProperties headerProperties = - (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = null, encoding = null; - if (headerProperties != null) - { - AMQShortString mimeTypeShortSting = headerProperties.getContentType(); - mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); - encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); - } - - Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; - - return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); - } - - /** - * Returns the header contents of the messages stored in this queue in tabular form. - */ - public TabularData viewMessages(int beginIndex, int endIndex) throws JMException - { - if ((beginIndex > endIndex) || (beginIndex < 1)) - { - throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex - + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); - } - - List<QueueEntry> list = _queue.getMessagesOnTheQueue(); - TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); - - // Create the tabular list of message header contents - for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) - { - QueueEntry queueEntry = list.get(i - 1); - AMQMessage msg = queueEntry.getMessage(); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, - queueEntry.isRedelivered() }; - CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); - _messageList.put(messageData); - } - - return _messageList; - } - - private String[] getMessageHeaderProperties(ContentHeaderBody headerBody) - { - List<String> list = new ArrayList<String>(); - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; - list.add("reply-to = " + headerProperties.getReplyToAsString()); - list.add("propertyFlags = " + headerProperties.getPropertyFlags()); - list.add("ApplicationID = " + headerProperties.getAppIdAsString()); - list.add("ClusterID = " + headerProperties.getClusterIdAsString()); - list.add("UserId = " + headerProperties.getUserIdAsString()); - list.add("JMSMessageID = " + headerProperties.getMessageIdAsString()); - list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString()); - - int delMode = headerProperties.getDeliveryMode(); - list.add("JMSDeliveryMode = " + - ((delMode == BasicContentHeaderProperties.PERSISTENT) ? "Persistent" : "Non_Persistent")); - - list.add("JMSPriority = " + headerProperties.getPriority()); - list.add("JMSType = " + headerProperties.getType()); - - long longDate = headerProperties.getExpiration(); - String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSExpiration = " + strDate); - - longDate = headerProperties.getTimestamp(); - strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSTimestamp = " + strDate); - - return list.toArray(new String[list.size()]); - } - - /** - * @see ManagedQueue#moveMessages - * @param fromMessageId - * @param toMessageId - * @param toQueueName - * @throws JMException - */ - public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException - { - if ((fromMessageId > toMessageId) || (fromMessageId < 1)) - { - throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\""); - } - - _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext); - } - - /** - * returns Notifications sent by this MBean. - */ - @Override - public MBeanNotificationInfo[] getNotificationInfo() - { - String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED }; - String name = MonitorNotification.class.getName(); - String description = "Either Message count or Queue depth or Message size has reached threshold high value"; - MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); - - return new MBeanNotificationInfo[] { info1 }; - } - -} // End of AMQQueueMBean class diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java deleted file mode 100644 index cbe9246f09..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public class DefaultQueueRegistry implements QueueRegistry -{ - private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); - - private final VirtualHost _virtualHost; - - public DefaultQueueRegistry(VirtualHost virtualHost) - { - _virtualHost = virtualHost; - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public void registerQueue(AMQQueue queue) throws AMQException - { - _queueMap.put(queue.getName(), queue); - } - - public void unregisterQueue(AMQShortString name) throws AMQException - { - _queueMap.remove(name); - } - - public AMQQueue getQueue(AMQShortString name) - { - return _queueMap.get(name); - } - - public Collection<AMQShortString> getQueueNames() - { - return _queueMap.keySet(); - } - - public Collection<AMQQueue> getQueues() - { - return _queueMap.values(); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java deleted file mode 100644 index a2fcab9e73..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; - -public class ExchangeBinding -{ - private final Exchange _exchange; - private final AMQShortString _routingKey; - private final FieldTable _arguments; - - private static final FieldTable EMPTY_ARGUMENTS = new FieldTable(); - - ExchangeBinding(AMQShortString routingKey, Exchange exchange) - { - this(routingKey, exchange, EMPTY_ARGUMENTS); - } - - ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments) - { - _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey; - _exchange = exchange; - _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments; - } - - void unbind(AMQQueue queue) throws AMQException - { - _exchange.deregisterQueue(_routingKey, queue, _arguments); - } - - public Exchange getExchange() - { - return _exchange; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - public FieldTable getArguments() - { - return _arguments; - } - - public int hashCode() - { - return (_exchange == null ? 0 : _exchange.hashCode()) - + (_routingKey == null ? 0 : _routingKey.hashCode()); - } - - public boolean equals(Object o) - { - if (!(o instanceof ExchangeBinding)) - { - return false; - } - ExchangeBinding eb = (ExchangeBinding) o; - return _exchange.equals(eb._exchange) - && _routingKey.equals(eb._routingKey); - } -}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java deleted file mode 100644 index fb839c1783..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; - -/** - * When a queue is deleted, it should be deregistered from any - * exchange it has been bound to. This class assists in this task, - * by keeping track of all bindings for a given queue. - */ -class ExchangeBindings -{ - private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>(); - private final AMQQueue _queue; - - ExchangeBindings(AMQQueue queue) - { - _queue = queue; - } - - /** - * Adds the specified binding to those being tracked. - * @param routingKey the routing key with which the queue whose bindings - * are being tracked by the instance has been bound to the exchange - * @param exchange the exchange bound to - */ - void addBinding(AMQShortString routingKey, FieldTable arguments, Exchange exchange) - { - _bindings.add(new ExchangeBinding(routingKey, exchange, arguments)); - } - - - public boolean remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange) - { - return _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments)); - } - - - /** - * Deregisters this queue from any exchange it has been bound to - */ - void deregister() throws AMQException - { - //remove duplicates at this point - HashSet<ExchangeBinding> copy = new HashSet<ExchangeBinding>(_bindings); - for (ExchangeBinding b : copy) - { - b.unbind(_queue); - } - } - - List<ExchangeBinding> getExchangeBindings() - { - return _bindings; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java deleted file mode 100644 index 6466e81dd2..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; - -/** - * Signals that the dequeue of a message from a queue failed. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Indicates the a message could not be dequeued from a queue. - * <tr><td> - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Happens as a consequence of a message store failure, or reference counting error. Both of which migh become - * runtime exceptions, as unrecoverable conditions? In which case this one might be dropped too. - */ -public class FailedDequeueException extends AMQException -{ - public FailedDequeueException(String queue) - { - super("Failed to dequeue message from " + queue); - } - - public FailedDequeueException(String queue, AMQException e) - { - super("Failed to dequeue message from " + queue, e); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java deleted file mode 100644 index b9d07d032b..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java +++ /dev/null @@ -1,385 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.util.FileUtils; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -public class FileQueueBackingStore implements QueueBackingStore -{ - private static final Logger _log = Logger.getLogger(FileQueueBackingStore.class); - - private String _flowToDiskLocation; - - public FileQueueBackingStore(String location) - { - _flowToDiskLocation = location; - } - - public AMQMessage load(Long messageId) - { - _log.info("Loading Message (ID:" + messageId + ")"); - - MessageMetaData mmd; - - File handle = getFileHandle(messageId); - - ObjectInputStream input = null; - - Exception error = null; - try - { - input = new ObjectInputStream(new FileInputStream(handle)); - - long arrivaltime = input.readLong(); - - final AMQShortString exchange = new AMQShortString(input.readUTF()); - final AMQShortString routingKey = new AMQShortString(input.readUTF()); - final boolean mandatory = input.readBoolean(); - final boolean immediate = input.readBoolean(); - - int bodySize = input.readInt(); - byte[] underlying = new byte[bodySize]; - - input.readFully(underlying, 0, bodySize); - - ByteBuffer buf = ByteBuffer.wrap(underlying); - - ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize); - - int chunkCount = input.readInt(); - - // There are WAY to many annonymous MPIs in the code this should be made concrete. - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return exchange; - } - - public void setExchange(AMQShortString exchange) - { - - } - - public boolean isImmediate() - { - return immediate; - } - - public boolean isMandatory() - { - return mandatory; - } - - public AMQShortString getRoutingKey() - { - return routingKey; - } - }; - - mmd = new MessageMetaData(info, chb, chunkCount); - mmd.setArrivalTime(arrivaltime); - - AMQMessage message; - if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == - BasicContentHeaderProperties.PERSISTENT) - { - message = new PersistentAMQMessage(messageId, null); - } - else - { - message = new TransientAMQMessage(messageId); - } - - message.recoverFromMessageMetaData(mmd); - - for (int chunk = 0; chunk < chunkCount; chunk++) - { - int length = input.readInt(); - - byte[] data = new byte[length]; - - input.readFully(data, 0, length); - - try - { - message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount)); - } - catch (AMQException e) - { - //ignore as this will not occur. - // It is thrown by the _transactionLog method in load on PersistentAMQMessage - // but we have created the message with a null log and will never call that method. - } - } - - return message; - } - catch (Exception e) - { - error = e; - } - finally - { - try - { - if (input != null) - { - input.close(); - } - } - catch (IOException e) - { - _log.info("Unable to close input on message(" + messageId + ") recovery due to:" + e.getMessage()); - } - } - - throw new UnableToRecoverMessageException(error); - } - - /** - * Thread safety is ensured here by synchronizing on the message object. - * - * This is safe as load() calls will fail until the first thread through here has created the file on disk - * and fully written the content. - * - * After this point new AMQMessages can exist that reference the same data thus breaking the synchronisation. - * - * Thread safety is maintained here as the existence of the file is checked allowing then subsequent unload() calls - * to skip the writing. - * - * Multiple unload() calls will initially be blocked using the synchronization until the data exists on disk thus - * safely allowing any reference to the message to be cleared prompting a load call. - * - * @param message the message to unload - * @throws UnableToFlowMessageException - */ - public void unload(AMQMessage message) throws UnableToFlowMessageException - { - //Synchorize on the message to ensure that one only thread can unload at a time. - // If a second unload is attempted then it will block until the unload has completed. - synchronized (message) - { - long messageId = message.getMessageId(); - - File handle = getFileHandle(messageId); - - //If we have written the data once then we don't need to do it again. - if (handle.exists()) - { - if (_log.isDebugEnabled()) - { - _log.debug("Message(ID:" + messageId + ") already unloaded."); - } - return; - } - - if (_log.isInfoEnabled()) - { - _log.info("Unloading Message (ID:" + messageId + ")"); - } - - ObjectOutputStream writer = null; - Exception error = null; - - try - { - writer = new ObjectOutputStream(new FileOutputStream(handle)); - - writer.writeLong(message.getArrivalTime()); - - MessagePublishInfo mpi = message.getMessagePublishInfo(); - writer.writeUTF(String.valueOf(mpi.getExchange())); - writer.writeUTF(String.valueOf(mpi.getRoutingKey())); - writer.writeBoolean(mpi.isMandatory()); - writer.writeBoolean(mpi.isImmediate()); - ContentHeaderBody chb = message.getContentHeaderBody(); - - // write out the content header body - final int bodySize = chb.getSize(); - byte[] underlying = new byte[bodySize]; - ByteBuffer buf = ByteBuffer.wrap(underlying); - chb.writePayload(buf); - - writer.writeInt(bodySize); - writer.write(underlying, 0, bodySize); - - int bodyCount = message.getBodyCount(); - writer.writeInt(bodyCount); - - //WriteContentBody - for (int index = 0; index < bodyCount; index++) - { - ContentChunk chunk = message.getContentChunk(index); - int length = chunk.getSize(); - - byte[] chunk_underlying = new byte[length]; - - ByteBuffer chunk_buf = chunk.getData(); - - chunk_buf.duplicate().rewind().get(chunk_underlying); - - writer.writeInt(length); - writer.write(chunk_underlying, 0, length); - } - } - catch (FileNotFoundException e) - { - error = e; - } - catch (IOException e) - { - error = e; - } - finally - { - // In a FileNotFound situation writer will be null. - if (writer != null) - { - try - { - writer.flush(); - writer.close(); - } - catch (IOException e) - { - error = e; - } - } - } - - if (error != null) - { - _log.error("Unable to unload message(" + messageId + ") to disk, restoring state."); - handle.delete(); - throw new UnableToFlowMessageException(messageId, error); - } - } - } - - /** - * Use the messageId to calculate the file path on disk. - * - * Current implementation will give us 256 bins. - * Therefore the maximum messages that can be flowed before error/platform is: - * ext3 : 256 bins * 32000 = 8192000 - * FAT32 : 256 bins * 65534 = 16776704 - * Other FS have much greater limits than we need to worry about. - * - * @param messageId the Message we need a file Handle for. - * - * @return the File handle - */ - private File getFileHandle(long messageId) - { - // grab the 8 LSB to give us 256 bins - long bin = messageId & 0xFFL; - - String bin_path = _flowToDiskLocation + File.separator + bin; - File bin_dir = new File(bin_path); - - if (!bin_dir.exists()) - { - bin_dir.mkdirs(); - } - - String id = bin_path + File.separator + messageId; - - return new File(id); - } - - public void delete(Long messageId) - { - File handle = getFileHandle(messageId); - - if (handle.exists()) - { - if (_log.isInfoEnabled()) - { - _log.info("Message(" + messageId + ") delete flowToDisk."); - } - if (!handle.delete()) - { - throw new RuntimeException("Unable to delete flowToDisk data"); - } - } - } - - public void close() - { - _log.info("Closing Backing store at:" + _flowToDiskLocation); - if (!FileUtils.delete(new File(_flowToDiskLocation), true)) - { - // Attempting a second time appears to ensure that it is deleted. - if (!FileUtils.delete(new File(_flowToDiskLocation), true)) - { - _log.error("Unable to fully delete backing store location"); - } - } - } - - private class RecoverDataBuffer implements ContentChunk - { - private int _length; - private ByteBuffer _dataBuffer; - - public RecoverDataBuffer(int length, byte[] data) - { - _length = length; - _dataBuffer = ByteBuffer.wrap(data); - } - - public int getSize() - { - return _length; - } - - public ByteBuffer getData() - { - return _dataBuffer; - } - - public void reduceToFit() - { - - } - - } - -} - diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java deleted file mode 100644 index 8981db0071..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.util.FileUtils; - -import java.io.File; - -public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory -{ - private static final Logger _log = Logger.getLogger(FileQueueBackingStoreFactory.class); - - private String _flowToDiskLocation; - public static final String QUEUE_BACKING_DIR = "queueBacking"; - - public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException - { - setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation()); - } - - private void setFlowToDisk(String vHostName, String location) throws ConfigurationException - { - if (vHostName == null) - { - throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified"); - } - - if (location == null) - { - throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified."); - } - - _flowToDiskLocation = location; - - _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName; - - //Check the location we will create QUEUE_BACKING_DIR in. - File root = new File(location); - if (!root.exists()) - { - throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath()); - } - else - { - - if (root.isFile()) - { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:" + - root.getAbsolutePath()); - } - - if (!root.canWrite()) - { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:" + - root.getAbsolutePath()); - } - - } - - // if we don't mark QUEUE_BAKCING_DIR as a deleteOnExit it will remain. - File backingDir = new File(location + File.separator + QUEUE_BACKING_DIR); - if (backingDir.exists()) - { - if (!FileUtils.delete(backingDir, true)) - { - throw new ConfigurationException("Unable to delete existing Flow to Disk root at:" - + backingDir.getAbsolutePath()); - } - - if (backingDir.isFile()) - { - throw new ConfigurationException("Unable to create Temporary Flow to Disk root as specified location is a file:" + - backingDir.getAbsolutePath()); - } - } - - backingDir.deleteOnExit(); - if (!backingDir.mkdirs()) - { - throw new ConfigurationException("Unable to create Temporary Flow to Disk root:" + location + File.separator + QUEUE_BACKING_DIR); - } - - - File store = new File(_flowToDiskLocation); - if (store.exists()) - { - if (!FileUtils.delete(store, true)) - { - throw new ConfigurationException("Unable to delete existing Flow to Disk store at:" - + store.getAbsolutePath()); - } - - if (store.isFile()) - { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:" + - store.getAbsolutePath()); - } - - } - - _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath()); - store.deleteOnExit(); - if (!store.mkdir()) - { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath()); - } - } - - public QueueBackingStore createBacking(AMQQueue queue) - { - return new FileQueueBackingStore(createStore(queue.getName().toString())); - } - - private String createStore(String name) - { - return createStore(name, 0); - } - - /** - * Returns a hash code for non-null Object x. - * Uses the same hash code spreader as most other java.util hash tables. - * - * Borrowed from the Apache Harmony project - * @param x the object serving as a key - * @return the hash code - */ - public static int hash(Object x) { - int h = x.hashCode(); - h += ~(h << 9); - h ^= (h >>> 14); - h += (h << 4); - h ^= (h >>> 10); - return h; - } - - private String createStore(String name, int index) - { - - int hash = hash(name); - - long bin = hash & 0xFFL; - - String store = _flowToDiskLocation + File.separator + bin + File.separator + name; - - if (index > 0) - { - store += "-" + index; - } - - //TODO ensure name is safe for the OS i.e. on OSX you can't have any ':' - // Does java take care of this? - - File storeFile = new File(store); - - if (storeFile.exists()) - { - return createStore(name, index + 1); - } - - // Ensure we report an error if we cannot create the backing store. - if (!storeFile.mkdirs()) - { - _log.error("Unable to create queue backing directory for queue:" + name); - throw new RuntimeException("Unable to create queue backing directory for queue:" + name); - } - - storeFile.deleteOnExit(); - - return store; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java deleted file mode 100644 index d38932bb61..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java +++ /dev/null @@ -1,33 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.AMQException; - -public interface Filterable<E extends Exception> -{ - ContentHeaderBody getContentHeaderBody() throws E; - - boolean isPersistent() throws E; - - boolean isRedelivered(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java deleted file mode 100644 index b252c7304e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ /dev/null @@ -1,550 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -/** This is an abstract base class to handle */ -public abstract class FlowableBaseQueueEntryList implements QueueEntryList -{ - protected static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class); - - private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); - private final AtomicLong _atomicQueueSize = new AtomicLong(0L); - protected final AtomicLong _atomicQueueInMemory = new AtomicLong(0L); - /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */ - - protected long _memoryUsageMaximum = -1L; - - /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */ - protected long _memoryUsageMinimum = 0; - private volatile AtomicBoolean _flowed; - private QueueBackingStore _backingStore; - protected AMQQueue _queue; - private Executor _inhaler; - private Executor _purger; - private AtomicBoolean _stopped; - private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null); - protected boolean _disableFlowToDisk; - private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null); - private static final int BATCH_PROCESS_COUNT = 100; - protected FlowableBaseQueueEntryList _parentQueue; - - FlowableBaseQueueEntryList(AMQQueue queue) - { - _queue = queue; - _flowed = new AtomicBoolean(false); - VirtualHost vhost = queue.getVirtualHost(); - if (vhost != null) - { - _backingStore = vhost.getQueueBackingStoreFactory().createBacking(queue); - } - - _stopped = new AtomicBoolean(false); - _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); - _purger = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); - _disableFlowToDisk = true; - } - - public void setFlowed(boolean flowed) - { - if (_flowed.get() != flowed) - { - _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")"); - _flowed.set(flowed); - } - } - - protected void showUsage() - { - showUsage(""); - } - - protected void showUsage(String prefix) - { - if (_log.isTraceEnabled()) - { - _log.trace(prefix + " Queue(" + _queue.getName() + ") usage:" + memoryUsed() - + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum() - + "/" + dataSize()); - } - } - - public boolean isFlowed() - { - if (_parentQueue != null) - { - return _parentQueue.isFlowed(); - } - else - { - return _flowed.get(); - } - } - - public int size() - { - return _atomicQueueCount.get(); - } - - public long dataSize() - { - return _atomicQueueSize.get(); - } - - public long memoryUsed() - { - return _atomicQueueInMemory.get(); - } - - public void setMemoryUsageMaximum(long maximumMemoryUsage) - { - _memoryUsageMaximum = maximumMemoryUsage; - - if (maximumMemoryUsage >= 0) - { - _disableFlowToDisk = false; - } - - // Don't attempt to start the inhaler/purger unless we have a minimum value specified. - if (_memoryUsageMaximum >= 0) - { - setMemoryUsageMinimum(_memoryUsageMaximum / 2); - - // if we have now have to much memory in use we need to purge. - if (_memoryUsageMaximum < _atomicQueueInMemory.get()) - { - setFlowed(true); - startPurger(); - } - } - else - { - if (_log.isInfoEnabled()) - { - _log.info("Disabling Flow to Disk for queue:" + _queue.getName()); - } - _disableFlowToDisk = true; - } - } - - public long getMemoryUsageMaximum() - { - return _memoryUsageMaximum; - } - - public void setMemoryUsageMinimum(long minimumMemoryUsage) - { - _memoryUsageMinimum = minimumMemoryUsage; - - // Don't attempt to start the inhaler unless we have a minimum value specified. - if (_memoryUsageMinimum > 0) - { - checkAndStartInhaler(); - } - } - - private void checkAndStartInhaler() - { - // If we've increased the minimum memory above what we have in memory then - // we need to inhale more if there is more - if (!_disableFlowToDisk && _atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0) - { - startInhaler(); - } - } - - private void startInhaler() - { - MessageInhaler inhaler = new MessageInhaler(); - - if (_asynchronousInhaler.compareAndSet(null, inhaler)) - { - _inhaler.execute(inhaler); - } - } - - private void startPurger() - { - MessagePurger purger = new MessagePurger(); - - if (_asynchronousPurger.compareAndSet(null, purger)) - { - _purger.execute(purger); - } - } - - public long getMemoryUsageMinimum() - { - return _memoryUsageMinimum; - } - - /** - * Only to be called by the QueueEntry - * - * @param queueEntry the entry to unload - */ - public void entryUnloadedUpdateMemory(QueueEntry queueEntry) - { - if (_parentQueue != null) - { - _parentQueue.entryUnloadedUpdateMemory(queueEntry); - } - else - { - if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) - { - _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); - } - - checkAndStartInhaler(); - } - } - - /** - * Only to be called from the QueueEntry - * - * @param queueEntry the entry to load - */ - public void entryLoadedUpdateMemory(QueueEntry queueEntry) - { - if (_parentQueue != null) - { - _parentQueue.entryLoadedUpdateMemory(queueEntry); - } - else - { - if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) - { - _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); - setFlowed(true); - startPurger(); - } - } - } - - public void stop() - { - if (!_stopped.getAndSet(true)) - { - // The SimpleAMQQueue keeps running when stopped so we should just release the services - // rather than actively shutdown our threads. - //Shutdown thread for inhaler. - ReferenceCountingExecutorService.getInstance().releaseExecutorService(); - ReferenceCountingExecutorService.getInstance().releaseExecutorService(); - - _backingStore.close(); - } - } - - /** - * Mark this queue as part of another QueueEntryList for accounting purposes. - * - * All Calls from the QueueEntry to the QueueEntryList need to check if there is - * a parent QueueEntrylist upon which the action should take place. - * - * @param queueEntryList The parent queue that is performing accounting. - */ - public void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList) - { - _parentQueue = queueEntryList; - } - - protected void incrementCounters(final QueueEntryImpl queueEntry) - { - if (_parentQueue != null) - { - _parentQueue.incrementCounters(queueEntry); - } - else - { - _atomicQueueCount.incrementAndGet(); - _atomicQueueSize.addAndGet(queueEntry.getSize()); - long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize()); - - if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum) - { - setFlowed(true); - queueEntry.unload(); - } - } - } - - protected void dequeued(QueueEntryImpl queueEntry) - { - if (_parentQueue != null) - { - _parentQueue.dequeued(queueEntry); - } - else - { - _atomicQueueCount.decrementAndGet(); - _atomicQueueSize.addAndGet(-queueEntry.getSize()); - if (!queueEntry.isFlowed()) - { - if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) - { - _log.error("InMemory Count just went below 0 on dequeue."); - } - } - } - } - - public QueueBackingStore getBackingStore() - { - return _backingStore; - } - - private class MessageInhaler implements Runnable - { - public void run() - { - String threadName = Thread.currentThread().getName(); - Thread.currentThread().setName("Inhaler-" + _queue.getVirtualHost().getName() + "-" + _queue.getName()); - try - { - inhaleList(this); - } - finally - { - Thread.currentThread().setName(threadName); - } - } - } - - private void inhaleList(MessageInhaler messageInhaler) - { - if (_log.isInfoEnabled()) - { - _log.info("Inhaler Running:" + _queue.getName()); - showUsage("Inhaler Running:" + _queue.getName()); - } - // If in memory count is at or over max then we can't inhale - if (_atomicQueueInMemory.get() >= _memoryUsageMaximum) - { - if (_log.isDebugEnabled()) - { - _log.debug("Unable to start inhaling as we are already over quota:" + - _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum); - } - return; - } - - _asynchronousInhaler.compareAndSet(messageInhaler, null); - int inhaled = 1; - - //Because we may not be able to totally fill up to _memoryUsageMaximum we need to be able to say we've done - // enough loading and this inhale process should stop - boolean finshedInhaling = false; - - while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory - && !finshedInhaling // Have we loaded all we can fit into memory - && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we haven't loaded all that is available - && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do - && (inhaled > 0) // ensure we could inhale something - && _asynchronousInhaler.compareAndSet(null, messageInhaler)) // Ensure we are the running inhaler - { - inhaled = 0; - QueueEntryIterator iterator = iterator(); - - // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread. - while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) - && !iterator.getNode().isAvailable() && iterator.advance()) - { - //Find first AVAILABLE node - } - - // Because the above loop checks then moves on to the next entry a check for atTail will return true but - // we won't have checked the last entry to see if we can load it. So create atEndofList and update it based - // on the return from advance() which returns true if it can advance. - boolean atEndofList = false; - - while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we haven't filled our max memory - && !finshedInhaling // Have we loaded all we can fit into memory - && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do - && !atEndofList) // We have reached end of list QueueEntries - { - QueueEntry entry = iterator.getNode(); - - if (entry.isAvailable() && entry.isFlowed()) - { - if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum) - { - // We don't have space for this message so we need to stop inhaling. - if (_log.isDebugEnabled()) - { - _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity()); - } - finshedInhaling = true; - } - else - { - entry.load(); - inhaled++; - } - } - - atEndofList = !iterator.advance(); - } - - if (iterator.atTail()) - { - setFlowed(false); - } - - _asynchronousInhaler.set(null); - } - - if (_log.isInfoEnabled()) - { - _log.info("Inhaler Stopping:" + _queue.getName()); - showUsage("Inhaler Stopping:" + _queue.getName()); - } - - //If we have become flowed or have more capacity since we stopped then schedule the thread to run again. - if (!finshedInhaling && _flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) - { - if (_log.isInfoEnabled()) - { - _log.info("Rescheduling Inhaler:" + _queue.getName()); - } - _inhaler.execute(messageInhaler); - } - - } - - private class MessagePurger implements Runnable - { - public void run() - { - String threadName = Thread.currentThread().getName(); - Thread.currentThread().setName("Purger-" + _queue.getVirtualHost().getName() + "-" + _queue.getName()); - try - { - purgeList(this); - } - finally - { - Thread.currentThread().setName(threadName); - } - } - } - - private void purgeList(MessagePurger messagePurger) - { - // If in memory count is at or over max then we can't inhale - if (_atomicQueueInMemory.get() <= _memoryUsageMinimum) - { - if (_log.isDebugEnabled()) - { - _log.debug("Unable to start purging as we are already below our minimum cache level:" + - _atomicQueueInMemory.get() + "<=" + _memoryUsageMinimum); - } - return; - } - - if (_log.isInfoEnabled()) - { - _log.info("Purger Running:" + _queue.getName()); - showUsage("Purger Running:" + _queue.getName()); - } - - _asynchronousPurger.compareAndSet(messagePurger, null); - int purged = 0; - - while ((_atomicQueueInMemory.get() > _memoryUsageMaximum) - && purged < BATCH_PROCESS_COUNT - && _asynchronousPurger.compareAndSet(null, messagePurger)) - { - QueueEntryIterator iterator = iterator(); - - //There are potentially AQUIRED messages that can be purged but we can't purge the last AQUIRED message - // as it may have just become AQUIRED and not yet delivered. - - //To be safe only purge available messages. This should be fine as long as we have a small prefetch. - while (!iterator.getNode().isAvailable() && iterator.advance()) - { - //Find first AVAILABLE node - } - - // Count up the memory usage to find our minimum point - long memoryUsage = 0; - boolean atTail = false; - while ((memoryUsage < _memoryUsageMaximum) && !atTail) - { - QueueEntry entry = iterator.getNode(); - - if (entry.isAvailable() && !entry.isFlowed()) - { - memoryUsage += entry.getSize(); - // If this message is what puts us over the limit then break - // out of this loop as we need to purge this item. - if (memoryUsage > _memoryUsageMaximum) - { - break; - } - } - - atTail = !iterator.advance(); - } - - //Purge remainging mesages on queue - while (!atTail && (purged < BATCH_PROCESS_COUNT)) - { - QueueEntry entry = iterator.getNode(); - - if (entry.isAvailable() && !entry.isFlowed()) - { - entry.unload(); - purged++; - } - - atTail = !iterator.advance(); - } - - _asynchronousPurger.set(null); - } - - if (_log.isInfoEnabled()) - { - _log.info("Purger Stopping:" + _queue.getName()); - showUsage("Purger Stopping:" + _queue.getName()); - } - - //If we are still flowed and are over the minimum value then schedule to run again. - if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum) - { - if (_log.isInfoEnabled()) - { - _log.info("Rescheduling Purger:" + _queue.getName()); - } - _purger.execute(messagePurger); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java deleted file mode 100644 index 36ca197fa6..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.exchange.NoRouteException; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.transactionlog.TransactionLog; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; - -import java.util.ArrayList; - -public class IncomingMessage implements Filterable<RuntimeException> -{ - - /** Used for debugging purposes. */ - private static final Logger _logger = Logger.getLogger(IncomingMessage.class); - - private static final boolean SYNCHED_CLOCKS = - ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks(); - - private final MessagePublishInfo _messagePublishInfo; - private ContentHeaderBody _contentHeaderBody; - private AMQMessage _message; - private final TransactionalContext _txnContext; - - private static final boolean MSG_AUTH = - ApplicationRegistry.getInstance().getConfiguration().getMsgAuth(); - - - /** - * Keeps a track of how many bytes we have received in body frames - */ - private long _bodyLengthReceived = 0; - - /** - * This is stored during routing, to know the queues to which this message should immediately be - * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done - * by the message handle. - */ - private ArrayList<AMQQueue> _destinationQueues; - - private AMQProtocolSession _publisher; - private TransactionLog _messageStore; - private long _expiration; - - private Exchange _exchange; - private static MessageFactory MESSAGE_FACTORY = MessageFactory.getInstance(); - - public IncomingMessage(final MessagePublishInfo info, - final TransactionalContext txnContext, - final AMQProtocolSession publisher, - TransactionLog messasgeStore) - { - if (publisher == null) - { - throw new NullPointerException("Message Publisher cannot be null"); - } - _messagePublishInfo = info; - _txnContext = txnContext; - _publisher = publisher; - _messageStore = messasgeStore; - } - - public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException - { - _contentHeaderBody = contentHeaderBody; - _message = MESSAGE_FACTORY.createMessage(_messageStore, isPersistent()); - } - - public void setExpiration() - { - long expiration = - ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration(); - long timestamp = - ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp(); - - if (SYNCHED_CLOCKS) - { - _expiration = expiration; - } - else - { - // Update TTL to be in broker time. - if (expiration != 0L) - { - if (timestamp != 0L) - { - // todo perhaps use arrival time - long diff = (System.currentTimeMillis() - timestamp); - - if ((diff > 1000L) || (diff < 1000L)) - { - _expiration = expiration + diff; - } - } - } - } - - } - - public void routingComplete(final TransactionLog transactionLog) throws AMQException - { - - if (isPersistent()) - { - _txnContext.beginTranIfNecessary(); - // enqueuing the messages ensure that if required the destinations are recorded to a - // persistent store - - if(_destinationQueues != null) - { - transactionLog.enqueueMessage(_txnContext.getStoreContext(), _destinationQueues, getMessageId()); - } - } - } - - public AMQMessage deliverToQueues() - throws AMQException - { - - // we get a reference to the destination queues now so that we can clear the - // transient message data as quickly as possible - if (_logger.isDebugEnabled()) - { - _logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues); - } - - - // first we allow the handle to know that the message has been fully received. This is useful if it is - // maintaining any calculated values based on content chunks - _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody()); - - - - _message.setExpiration(_expiration); - _message.setClientIdentifier(_publisher.getSessionIdentifier()); - - // we then allow the transactional context to do something with the message content - // now that it has all been received, before we attempt delivery - _txnContext.messageFullyReceived(isPersistent()); - - AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ? - ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null; - - if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString())) - { - throw new UnauthorizedAccessException("Acccess Refused", _message); - } - - if ((_destinationQueues == null) || _destinationQueues.size() == 0) - { - - if (isMandatory() || isImmediate()) - { - throw new NoRouteException("No Route for message", _message); - - } - else - { - _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message); - } - } - else - { - int offset; - final int queueCount = _destinationQueues.size(); - if(queueCount == 1) - { - offset = 0; - } - else - { - offset = ((int)(_message.getMessageId().longValue())) % queueCount; - if(offset < 0) - { - offset = -offset; - } - } - for (int i = offset; i < queueCount; i++) - { - // normal deliver so add this message at the end. - _txnContext.deliver(_destinationQueues.get(i), _message); - } - for (int i = 0; i < offset; i++) - { - // normal deliver so add this message at the end. - _txnContext.deliver(_destinationQueues.get(i), _message); - } - } - - return _message; - - - - } - - public void addContentBodyFrame(final ContentChunk contentChunk) - throws AMQException - { - - _bodyLengthReceived += contentChunk.getSize(); - - _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived()); - - } - - public boolean allContentReceived() - { - return (_bodyLengthReceived == getContentHeaderBody().bodySize); - } - - public AMQShortString getExchange() throws AMQException - { - return _messagePublishInfo.getExchange(); - } - - public AMQShortString getRoutingKey() throws AMQException - { - return _messagePublishInfo.getRoutingKey(); - } - - public boolean isMandatory() throws AMQException - { - return _messagePublishInfo.isMandatory(); - } - - - public boolean isImmediate() throws AMQException - { - return _messagePublishInfo.isImmediate(); - } - - public ContentHeaderBody getContentHeaderBody() - { - return _contentHeaderBody; - } - - - public boolean isPersistent() - { - return getContentHeaderBody().properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == - BasicContentHeaderProperties.PERSISTENT; - } - - public boolean isRedelivered() - { - return false; - } - - /** - * The message ID will not be assigned until the ContentHeaderBody has arrived. - * @return - */ - public Long getMessageId() - { - return _message.getMessageId(); - } - - public void setExchange(final Exchange e) - { - _exchange = e; - } - - public void route() throws AMQException - { - _exchange.route(this); - } - - public void enqueue(final ArrayList<AMQQueue> queues) - { - _destinationQueues = queues; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java deleted file mode 100644 index d91d45a446..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.io.IOException; - -import javax.management.JMException; -import javax.management.MBeanOperationInfo; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.management.MBeanAttribute; -import org.apache.qpid.server.management.MBeanOperation; -import org.apache.qpid.server.management.MBeanOperationParameter; - -/** - * The management interface exposed to allow management of a queue. - * @author Robert J. Greig - * @author Bhupendra Bhardwaj - * @version 0.1 - */ -public interface ManagedQueue -{ - static final String TYPE = "Queue"; - static final int VERSION = 2; - - /** - * Returns the Name of the ManagedQueue. - * @return the name of the managedQueue. - * @throws IOException - */ - @MBeanAttribute(name="Name", description = TYPE + " Name") - String getName() throws IOException; - - /** - * Total number of messages on the queue, which are yet to be delivered to the consumer(s). - * @return number of undelivered message in the Queue. - * @throws IOException - */ - @MBeanAttribute(name="MessageCount", description = "Total number of undelivered messages on the queue") - Integer getMessageCount() throws IOException; - - /** - * Tells the total number of messages receieved by the queue since startup. - * @return total number of messages received. - * @throws IOException - */ - @MBeanAttribute(name="ReceivedMessageCount", description="The total number of messages receieved by the queue since startup") - Long getReceivedMessageCount() throws IOException; - - /** - * Size of messages in the queue - * @return - * @throws IOException - */ - @MBeanAttribute(name="QueueDepth", description="The total size(Bytes) of messages in the queue") - Long getQueueDepth() throws IOException, JMException; - - /** - * Returns the total number of active subscribers to the queue. - * @return the number of active subscribers - * @throws IOException - */ - @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue") - Integer getActiveConsumerCount() throws IOException; - - /** - * Returns the total number of subscribers to the queue. - * @return the number of subscribers. - * @throws IOException - */ - @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue") - Integer getConsumerCount() throws IOException; - - /** - * Tells the Owner of the ManagedQueue. - * @return the owner's name. - * @throws IOException - */ - @MBeanAttribute(name="Owner", description = "Owner") - String getOwner() throws IOException; - - /** - * Tells whether this ManagedQueue is durable or not. - * @return true if this ManagedQueue is a durable queue. - * @throws IOException - */ - @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable") - boolean isDurable() throws IOException; - - /** - * Tells if the ManagedQueue is set to AutoDelete. - * @return true if the ManagedQueue is set to AutoDelete. - * @throws IOException - */ - @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete") - boolean isAutoDelete() throws IOException; - - /** - * Returns the maximum age of a message (expiration time) in milliseconds - * @return the maximum age - * @throws IOException - */ - Long getMaximumMessageAge() throws IOException; - - /** - * Sets the maximum age of a message in milliseconds - * @param age maximum age of message. - * @throws IOException - */ - @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value(milliseconds) for message age") - void setMaximumMessageAge(Long age) throws IOException; - - /** - * Returns the maximum size of a message (in Bytes) allowed to be accepted by the - * ManagedQueue. This is useful in setting notifications or taking - * appropriate action, if the size of the message received is more than - * the allowed size. - * @return the maximum size of a message allowed to be aceepted by the - * ManagedQueue. - * @throws IOException - */ - Long getMaximumMessageSize() throws IOException; - - /** - * Sets the maximum size of the message (in Bytes) that is allowed to be - * accepted by the Queue. - * @param size maximum size of message. - * @throws IOException - */ - @MBeanAttribute(name="MaximumMessageSize", description="Threshold high value(Bytes) for a message size") - void setMaximumMessageSize(Long size) throws IOException; - - /** - * Tells the maximum number of messages that can be stored in the queue. - * This is useful in setting the notifications or taking required - * action is the number of message increase this limit. - * @return maximum muber of message allowed to be stored in the queue. - * @throws IOException - */ - Long getMaximumMessageCount() throws IOException; - - /** - * Sets the maximum number of messages allowed to be stored in the queue. - * @param value the maximum number of messages allowed to be stored in the queue. - * @throws IOException - */ - @MBeanAttribute(name="MaximumMessageCount", description="Threshold high value for number of undelivered messages in the queue") - void setMaximumMessageCount(Long value) throws IOException; - - /** - * This is useful for setting notifications or taking required action if the size of messages - * stored in the queue increases over this limit. - * @return threshold high value for Queue Depth - * @throws IOException - */ - Long getMaximumQueueDepth() throws IOException; - - /** - * Sets the maximum size of all the messages together, that can be stored - * in the queue. - * @param value - * @throws IOException - */ - @MBeanAttribute(name="MaximumQueueDepth", description="The threshold high value(Bytes) for Queue Depth") - void setMaximumQueueDepth(Long value) throws IOException; - - /** - * View the limit on the memory that this queue will utilise. - * - * Used by Flow to Disk. - * - * @return The maximum memory(B) that the queue will occuy. - */ - public Long getMemoryUsageMaximum(); - - /** - * Place a limit on the memory that this queue will utilise. - * - * Used by Flow to Disk - * - * @param maximumMemoryUsage The new maximum memory(B) to be used by this queue - */ - @MBeanAttribute(name="MemoryUsageMaximum", description="The maximum memory(Bytes) that the queue will occupy.") - public void setMemoryUsageMaximum(Long maximumMemoryUsage); - - /** - * View the minimum amount of memory that has been defined for this queue. - * - * Used by Flow to Disk - * - * @return The minimum amount of queue data(B) that the queue will attempt to keep in memory - */ - public Long getMemoryUsageMinimum(); - - /** - * Set the minimum amount of memory that has been defined for this queue. - * - * Used by Flow to Disk - * - * @param minimumMemoryUsage The new minimum memory(B) level to be used by this queue - */ - @MBeanAttribute(name="MemoryUsageMinimum", description="The minimum memory(Bytes) that the queue will occupy.") - public void setMemoryUsageMinimum(Long minimumMemoryUsage); - - /** - * View the amount of memory(B) that this queue is using. - * - * @return The current memory(B) usage of this queue. - */ - @MBeanAttribute(name="MemoryUsageCurrent", description="The current amount of memory(Bytes) used by this queue.") - public Long getMemoryUsageCurrent(); - - /** - * When a queue exceeds its MemoryUsageMaximum value then the Queue will start flowing to disk. - * - * This boolean is used to show that change in state. - * - * @return true if the Queue is currently flowing to disk - */ - @MBeanAttribute(name="isFlowed", description="true if the queue is currently flowing to disk.") - public boolean isFlowed(); - - - - //********** Operations *****************// - - - /** - * Returns a subset of all the messages stored in the queue. The messages - * are returned based on the given index numbers. - * @param fromIndex - * @param toIndex - * @return - * @throws IOException - * @throws JMException - */ - @MBeanOperation(name="viewMessages", - description="Message headers for messages in this queue within given index range. eg. from index 1 - 100") - TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex, - @MBeanOperationParameter(name="to index", description="to index")int toIndex) - throws IOException, JMException, AMQException; - - @MBeanOperation(name="viewMessageContent", description="The message content for given Message Id") - CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId) - throws IOException, JMException; - - /** - * Deletes the first message from top. - * @throws IOException - * @throws JMException - */ - @MBeanOperation(name="deleteMessageFromTop", description="Deletes the first message from top", - impact= MBeanOperationInfo.ACTION) - void deleteMessageFromTop() throws IOException, JMException; - - /** - * Clears the queue by deleting all the undelivered messages from the queue. - * @throws IOException - * @throws JMException - */ - @MBeanOperation(name="clearQueue", - description="Clears the queue by deleting all the undelivered messages from the queue", - impact= MBeanOperationInfo.ACTION) - void clearQueue() throws IOException, JMException; - - /** - * Moves the messages in given range of message Ids to given Queue. QPID-170 - * @param fromMessageId first in the range of message ids - * @param toMessageId last in the range of message ids - * @param toQueue where the messages are to be moved - * @throws IOException - * @throws JMException - * @throws AMQException - */ - @MBeanOperation(name="moveMessages", - description="You can move messages to another queue from this queue ", - impact= MBeanOperationInfo.ACTION) - void moveMessages(@MBeanOperationParameter(name="from MessageId", description="from MessageId")long fromMessageId, - @MBeanOperationParameter(name="to MessageId", description="to MessageId")long toMessageId, - @MBeanOperationParameter(name= ManagedQueue.TYPE, description="to Queue Name")String toQueue) - throws IOException, JMException, AMQException; -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java deleted file mode 100644 index 090096d3c3..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; - -/** - * MessageCleanupException represents the failure to perform reference counting on messages correctly. This should not - * happen, but there may be programming errors giving race conditions that cause the reference counting to go wrong. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Signals that the reference count of a message has gone below zero. - * <tr><td> Indicates that a message store has lost a message which is still referenced. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo The race conditions leading to this error should be cleaned up, and a runtime exception used instead. If the - * message store loses messages, then something is seriously wrong and it would be sensible to terminate the - * broker. This may be disguising out of memory errors. - */ -public class MessageCleanupException extends AMQException -{ - public MessageCleanupException(long messageId, AMQException e) - { - super("Failed to cleanup message with id " + messageId, e); - } - - public MessageCleanupException(String message) - { - super(message); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java deleted file mode 100644 index 10e7dca18f..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.transactionlog.TransactionLog; - -import java.util.concurrent.atomic.AtomicLong; - -public class MessageFactory -{ - private AtomicLong _messageId; - private static MessageFactory INSTANCE; - - private enum State - { - RECOVER, - OPEN - } - - private State _state = State.RECOVER; - - private MessageFactory() - { - _messageId = new AtomicLong(0L); - } - - public void recoveryComplete() - { - _state = State.OPEN; - } - - /** - * Only to be used by tests as this will cause violate the principal that message IDs should not be reused. - */ - public void reset() - { - _state = State.RECOVER; - _messageId = new AtomicLong(0L); - } - - /** - * Normal message creation path - * @param transactionLog - * @param persistent - * @return - */ - public AMQMessage createMessage(TransactionLog transactionLog, boolean persistent) - { - if (_state != State.OPEN) - { - _state = State.OPEN; - } - - return createNextMessage(_messageId.incrementAndGet(), transactionLog, persistent); - } - - /** - * Used for message recovery only and so only creates persistent messages. - * @param messageId the id that this message must have - * @param transactionLog - * @return - */ - public AMQMessage createMessage(Long messageId, TransactionLog transactionLog) - { - if (_state != State.RECOVER) - { - throw new RuntimeException("Unable to create message by ID when not recovering"); - } - - if (messageId < 0L) - { - throw new RuntimeException("Message IDs can only be positive. Requested:" + messageId); - } - - _messageId.set((int)Math.max(messageId, _messageId.get())); - - return createNextMessage(messageId, transactionLog, true); - } - - private AMQMessage createNextMessage(Long messageId, TransactionLog transactionLog, boolean persistent) - { - if (persistent) - { - return new PersistentAMQMessage(messageId, transactionLog); - } - else - { - return new TransientAMQMessage(messageId); - } - } - - public static MessageFactory getInstance() - { - if (INSTANCE == null) - { - INSTANCE = new MessageFactory(); - } - - return INSTANCE; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java deleted file mode 100644 index 6118a4c11f..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; - -/** - * Encapsulates a publish body and a content header. In the context of the message store these are treated as a - * single unit. - */ -public class MessageMetaData -{ - private MessagePublishInfo _messagePublishInfo; - - private ContentHeaderBody _contentHeaderBody; - - private int _contentChunkCount; - - private long _arrivalTime; - - public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount) - { - this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis()); - } - - public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime) - { - _contentHeaderBody = contentHeaderBody; - _messagePublishInfo = publishBody; - _contentChunkCount = contentChunkCount; - _arrivalTime = arrivalTime; - } - - public int getContentChunkCount() - { - return _contentChunkCount; - } - - public void setContentChunkCount(int contentChunkCount) - { - _contentChunkCount = contentChunkCount; - } - - public ContentHeaderBody getContentHeaderBody() - { - return _contentHeaderBody; - } - - public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) - { - _contentHeaderBody = contentHeaderBody; - } - - public MessagePublishInfo getMessagePublishInfo() - { - return _messagePublishInfo; - } - - public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo) - { - _messagePublishInfo = messagePublishInfo; - } - - public long getArrivalTime() - { - return _arrivalTime; - } - - public void setArrivalTime(long arrivalTime) - { - _arrivalTime = arrivalTime; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java deleted file mode 100644 index d6fd1eec89..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.RequiredDeliveryException; - -/** - * NoConsumersException is a {@link RequiredDeliveryException} that represents the failure case where an immediate - * message cannot be delivered because there are presently no consumers for the message. The AMQP status code, 313, is - * always used to report this condition. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to deliver a message that must be delivered. - * </table> - */ -public class NoConsumersException extends RequiredDeliveryException -{ - public NoConsumersException(AMQMessage message) - { - super("Immediate delivery is not possible.", message); - } - - public AMQConstant getReplyCode() - { - return AMQConstant.NO_CONSUMERS; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java deleted file mode 100644 index a83d661de2..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ /dev/null @@ -1,129 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-public enum NotificationCheck
-{
-
- MESSAGE_COUNT_ALERT
- {
- boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
- {
- int msgCount;
- final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
- {
- listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
- return true;
- }
- return false;
- }
- },
- MESSAGE_SIZE_ALERT(true)
- {
- boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
- {
- final long maximumMessageSize = queue.getMaximumMessageSize();
- if(maximumMessageSize != 0)
- {
- // Check for threshold message size
- long messageSize = (queueEntry == null) ? 0 : queueEntry.getSize();
-
- if (messageSize >= maximumMessageSize)
- {
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
- maximumMessageSize + ") breached. [Message ID=" +
- (queueEntry == null ? "null" : queueEntry.getMessageId()) + "]");
- return true;
- }
- }
- return false;
- }
-
- },
- QUEUE_DEPTH_ALERT
- {
- boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
- {
- // Check for threshold queue depth in bytes
- final long maximumQueueDepth = queue.getMaximumQueueDepth();
-
- if(maximumQueueDepth != 0)
- {
- final long queueDepth = queue.getQueueDepth();
-
- if (queueDepth >= maximumQueueDepth)
- {
- listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
- return true;
- }
- }
- return false;
- }
-
- },
- MESSAGE_AGE_ALERT
- {
- boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
- {
-
- final long maxMessageAge = queue.getMaximumMessageAge();
- if(maxMessageAge != 0)
- {
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - maxMessageAge;
- final long firstArrivalTime = queue.getOldestMessageArrivalTime();
-
- if(firstArrivalTime < thresholdTime)
- {
- long oldestAge = currentTime - firstArrivalTime;
- listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
-
- return true;
- }
- }
- return false;
-
- }
-
- }
- ;
-
- private final boolean _messageSpecific;
-
- NotificationCheck()
- {
- this(false);
- }
-
- NotificationCheck(boolean messageSpecific)
- {
- _messageSpecific = messageSpecific;
- }
-
- public boolean isMessageSpecific()
- {
- return _messageSpecific;
- }
-
- abstract boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener);
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java deleted file mode 100644 index 9c644cc010..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.transactionlog.TransactionLog; - -public class PersistentAMQMessage extends TransientAMQMessage -{ - protected TransactionLog _transactionLog; - - public PersistentAMQMessage(Long messageId, TransactionLog transactionLog) - { - super(messageId); - _transactionLog = transactionLog; - } - - @Override - public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) - throws AMQException - { - super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody); - _transactionLog.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1, - contentChunk, isLastContentBody); - } - - @Override - public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, - ContentHeaderBody contentHeaderBody) - throws AMQException - { - super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody); - MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, - _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime); - - _transactionLog.storeMessageMetaData(storeContext, _messageId, mmd); - } - - @Override - public boolean isPersistent() - { - return true; - } - - @Override - public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException - { - super.addContentBodyFrame(null, contentChunk, isLastContentBody); - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java deleted file mode 100644 index 83c7ebb4f2..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java +++ /dev/null @@ -1,178 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.CommonContentHeaderProperties; - -public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList -{ - private final AMQQueue _queue; - private final QueueEntryList[] _priorityLists; - private final int _priorities; - private final int _priorityOffset; - - public PriorityQueueEntryList(AMQQueue queue, int priorities) - { - super(queue); - _queue = queue; - _priorityLists = new QueueEntryList[priorities]; - _priorities = priorities; - _priorityOffset = 5 - ((priorities + 1) / 2); - for (int i = 0; i < priorities; i++) - { - _priorityLists[i] = new SimpleQueueEntryList(queue); - _priorityLists[i].setParentQueueEntryList(this); - } - - showUsage("Created:" + _queue.getName()); - } - - public int getPriorities() - { - return _priorities; - } - - public AMQQueue getQueue() - { - return _queue; - } - - public QueueEntry add(AMQMessage message) - { - int index = ((CommonContentHeaderProperties) ((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; - if (index >= _priorities) - { - index = _priorities - 1; - } - else if (index < 0) - { - index = 0; - } - - return _priorityLists[index].add(message); - } - - - public QueueEntry next(QueueEntry node) - { - QueueEntryImpl nodeImpl = (QueueEntryImpl) node; - QueueEntry next = nodeImpl.getNext(); - - if (next == null) - { - QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList(); - int index; - for (index = _priorityLists.length - 1; _priorityLists[index] != nodeEntryList; index--) - { - ; - } - - while (next == null && index != 0) - { - index--; - next = ((QueueEntryImpl) _priorityLists[index].getHead()).getNext(); - } - - } - return next; - } - - private final class PriorityQueueEntryListIterator implements QueueEntryIterator - { - private final QueueEntryIterator[] _iterators = new QueueEntryIterator[_priorityLists.length]; - private QueueEntry _lastNode; - - PriorityQueueEntryListIterator() - { - for (int i = 0; i < _priorityLists.length; i++) - { - _iterators[i] = _priorityLists[i].iterator(); - } - _lastNode = _iterators[_iterators.length - 1].getNode(); - } - - public boolean atTail() - { - for (int i = 0; i < _iterators.length; i++) - { - if (!_iterators[i].atTail()) - { - return false; - } - } - return true; - } - - public QueueEntry getNode() - { - return _lastNode; - } - - public boolean advance() - { - for (int i = _iterators.length - 1; i >= 0; i--) - { - if (_iterators[i].advance()) - { - _lastNode = _iterators[i].getNode(); - return true; - } - } - return false; - } - } - - public QueueEntryIterator iterator() - { - return new PriorityQueueEntryListIterator(); - } - - public QueueEntry getHead() - { - return _priorityLists[_priorities - 1].getHead(); - } - - static class Factory implements QueueEntryListFactory - { - private final int _priorities; - - Factory(int priorities) - { - _priorities = priorities; - } - - public QueueEntryList createQueueEntryList(AMQQueue queue) - { - return new PriorityQueueEntryList(queue, _priorities); - } - } - - - @Override - public void stop() - { - super.stop(); - for (QueueEntryList queueEntryList : _priorityLists) - { - queueEntryList.stop(); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java deleted file mode 100644 index 5c65cb6424..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.commons.configuration.ConfigurationException; - -public interface QueueBackingStore -{ - /** - * Retrieve the message with a given ID - * - * This method must be thread safe. - * - * Multiple calls to load with a given messageId DO NOT need to return the same object. - * - * @param messageId the id of the message to retreive. - * @return - */ - AMQMessage load(Long messageId); - - /** - * Store a message in the BackingStore. - * - * This method must be thread safe understanding that multiple message objects may be the same data. - * - * Allowing a thread to return from this method means that it is safe to call load() - * - * Implementer guide: - * Until the message has been loaded the message references will all be the same object. - * - * One appraoch as taken by the @see FileQueueBackingStore is to block aimulataneous calls to this method - * until the message is fully on disk. This can be done by synchronising on message as initially it is always the - * same object. Only after a load has taken place will there be a discrepency. - * - * - * @param message the message to unload - * @throws UnableToFlowMessageException - */ - void unload(AMQMessage message) throws UnableToFlowMessageException; - - void delete(Long messageId); - - void close(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java deleted file mode 100644 index 3dd23a2f40..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.commons.configuration.ConfigurationException; - -public interface QueueBackingStoreFactory -{ - void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException; - - public QueueBackingStore createBacking(AMQQueue queue); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java deleted file mode 100644 index fb23edb3c5..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ /dev/null @@ -1,233 +0,0 @@ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.subscription.Subscription; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQException> -{ - public static enum State - { - AVAILABLE, - ACQUIRED, - EXPIRED, - DEQUEUED, - DELETED - } - - public static interface StateChangeListener - { - public void stateChanged(QueueEntry entry, State oldSate, State newState); - } - - public abstract class EntryState - { - private EntryState() - { - } - - public abstract State getState(); - } - - public final class AvailableState extends EntryState - { - - public State getState() - { - return State.AVAILABLE; - } - } - - public final class DequeuedState extends EntryState - { - - public State getState() - { - return State.DEQUEUED; - } - } - - public final class DeletedState extends EntryState - { - - public State getState() - { - return State.DELETED; - } - } - - public final class ExpiredState extends EntryState - { - - public State getState() - { - return State.EXPIRED; - } - } - - public final class NonSubscriptionAcquiredState extends EntryState - { - public State getState() - { - return State.ACQUIRED; - } - } - - public final class SubscriptionAcquiredState extends EntryState - { - private final Subscription _subscription; - - public SubscriptionAcquiredState(Subscription subscription) - { - _subscription = subscription; - } - - public State getState() - { - return State.ACQUIRED; - } - - public Subscription getSubscription() - { - return _subscription; - } - } - - final static EntryState AVAILABLE_STATE = new AvailableState(); - final static EntryState DELETED_STATE = new DeletedState(); - final static EntryState DEQUEUED_STATE = new DequeuedState(); - final static EntryState EXPIRED_STATE = new ExpiredState(); - final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); - - /** Flag to indicate that this message requires 'immediate' delivery. */ - - final static byte IMMEDIATE = 0x01; - - /** - * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality - * for messages published with the 'immediate' flag. - */ - - final static byte DELIVERED_TO_CONSUMER = 0x02; - - - AMQQueue getQueue(); - - AMQMessage getMessage(); - - Long getMessageId(); - - long getSize(); - - /** - * Called selectors to determin if the message has already been sent - * - * @return _deliveredToConsumer - */ - boolean getDeliveredToConsumer(); - - /** - * Checks to see if the message has expired. If it has the message is dequeued. - * - * @return true if the message has expire - * - * @throws org.apache.qpid.AMQException - */ - boolean expired() throws AMQException; - - public void setExpiration(final long expiration); - - boolean isAcquired(); - - boolean isAvailable(); - - boolean acquire(); - - boolean acquire(Subscription sub); - - boolean delete(); - - boolean isDeleted(); - - boolean acquiredBySubscription(); - - /** - * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). - * And for selector efficiency. - * - * This is now also used to unload the message if this entry is on a flowed queue. As a result this method should - * only be called after the message has been sent. - */ - void setDeliveredToSubscription(); - - void release(); - - String debugIdentity(); - - /** - * Called to enforce the 'immediate' flag. - * - * @returns true if the message is marked for immediate delivery but has not been marked as delivered - * to a consumer - */ - boolean immediateAndNotDelivered(); - - void setRedelivered(boolean b); - - Subscription getDeliveredSubscription(); - - void reject(); - - void reject(Subscription subscription); - - boolean isRejectedBy(Subscription subscription); - - void requeue(StoreContext storeContext) throws AMQException; - - void dequeue(final StoreContext storeContext) throws FailedDequeueException; - - /** - * Message has been ack so dequeueAndDelete it. - * If the message is persistent and this is the last QueueEntry that uses it then the data will be removed - * from the transaciton log - * - * @param storeContext the transactional Context in which to perform the deletion - * - * @throws FailedDequeueException - * @throws MessageCleanupException - */ - void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException; - - boolean isQueueDeleted(); - - void addStateChangeListener(StateChangeListener listener); - - boolean removeStateChangeListener(StateChangeListener listener); - - void unload(); - - AMQMessage load(); - - boolean isFlowed(); - -}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java deleted file mode 100644 index e6223ef4ac..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ /dev/null @@ -1,568 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.subscription.Subscription; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -public class QueueEntryImpl implements QueueEntry -{ - - /** Used for debugging purposes. */ - private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); - - private final SimpleQueueEntryList _queueEntryList; - - private AtomicReference<AMQMessage> _messageRef; - - private boolean _redelivered; - - private Set<Subscription> _rejectedBy = null; - - private volatile EntryState _state = AVAILABLE_STATE; - - private static final - AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState> - _stateUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, EntryState.class, "_state"); - - private volatile Set<StateChangeListener> _stateChangeListeners; - - private static final - AtomicReferenceFieldUpdater<QueueEntryImpl, Set> - _listenersUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, Set.class, "_stateChangeListeners"); - - private static final - AtomicLongFieldUpdater<QueueEntryImpl> - _entryIdUpdater = - AtomicLongFieldUpdater.newUpdater - (QueueEntryImpl.class, "_entryId"); - - private volatile long _entryId; - - volatile QueueEntryImpl _next; - - private long _messageSize; - private QueueBackingStore _backingStore; - private AtomicBoolean _flowed; - private Long _messageId; - - private byte _flags = 0; - - private long _expiration; - - private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); - private boolean _persistent; - private boolean _hasBeenUnloaded = false; - - QueueEntryImpl(SimpleQueueEntryList queueEntryList) - { - this(queueEntryList, null, Long.MIN_VALUE); - _state = DELETED_STATE; - } - - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId) - { - this(queueEntryList, message); - - _entryIdUpdater.set(this, entryId); - } - - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message) - { - _queueEntryList = queueEntryList; - _messageRef = new AtomicReference<AMQMessage>(message); - if (message != null) - { - _messageId = message.getMessageId(); - _messageSize = message.getSize(); - - if (message.isImmediate()) - { - _flags |= IMMEDIATE; - } - _expiration = message.getExpiration(); - _persistent = message.isPersistent(); - } - _backingStore = queueEntryList.getBackingStore(); - _flowed = new AtomicBoolean(false); - } - - protected void setEntryId(long entryId) - { - _entryIdUpdater.set(this, entryId); - } - - protected long getEntryId() - { - return _entryId; - } - - public AMQQueue getQueue() - { - return _queueEntryList.getQueue(); - } - - public AMQMessage getMessage() - { - return load(); - } - - public Long getMessageId() - { - return _messageId; - } - - public long getSize() - { - return _messageSize; - } - - public boolean getDeliveredToConsumer() - { - return (_flags & DELIVERED_TO_CONSUMER) != 0; - } - - /** - * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). - * And for selector efficiency. - * - * This is now also used to unload the message if this entry is on a flowed queue. As a result this method should - * only be called after the message has been sent. - */ - public void setDeliveredToSubscription() - { - _flags |= DELIVERED_TO_CONSUMER; - - // We have delivered this message so we can unload it if we are flowed. - if (_queueEntryList.isFlowed()) - { - unload(); - } - } - - public boolean expired() throws AMQException - { - if (_expiration != 0L) - { - long now = System.currentTimeMillis(); - - return (now > _expiration); - } - - return false; - } - - public void setExpiration(final long expiration) - { - _expiration = expiration; - } - - public boolean isAcquired() - { - return _state.getState() == State.ACQUIRED; - } - - public boolean isAvailable() - { - return _state.getState() == State.AVAILABLE; - } - - public boolean acquire() - { - return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE); - } - - private boolean acquire(final EntryState state) - { - boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state); - if (acquired && _stateChangeListeners != null) - { - notifyStateChange(State.AVAILABLE, State.ACQUIRED); - } - - return acquired; - } - - public boolean acquire(Subscription sub) - { - return acquire(sub.getOwningState()); - } - - public boolean acquiredBySubscription() - { - - return (_state instanceof SubscriptionAcquiredState); - } - - public void release() - { - _stateUpdater.set(this, AVAILABLE_STATE); - } - - public String debugIdentity() - { - String entry = "[State:" + _state.getState().name() + "]"; - - AMQMessage message = _messageRef.get(); - - if (message == null) - { - return entry + "(Message Unloaded ID:" + _messageId + ")"; - } - else - { - - return entry + message.debugIdentity(); - } - } - - public boolean immediateAndNotDelivered() - { - return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; - } - - public ContentHeaderBody getContentHeaderBody() throws AMQException - { - return getMessage().getContentHeaderBody(); - } - - public boolean isPersistent() throws AMQException - { - return _persistent; - } - - public boolean isRedelivered() - { - return _redelivered; - } - - public void setRedelivered(boolean redelivered) - { - _redelivered = redelivered; - // todo - here we could record this message as redelivered on this queue in the transactionLog - // so we don't have to mark all messages on recover as redelivered. - } - - public Subscription getDeliveredSubscription() - { - EntryState state = _state; - if (state instanceof SubscriptionAcquiredState) - { - return ((SubscriptionAcquiredState) state).getSubscription(); - } - else - { - return null; - } - - } - - public void reject() - { - reject(getDeliveredSubscription()); - } - - public void reject(Subscription subscription) - { - if (subscription != null) - { - if (_rejectedBy == null) - { - _rejectedBy = new HashSet<Subscription>(); - } - - _rejectedBy.add(subscription); - } - else - { - _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); - } - } - - public boolean isRejectedBy(Subscription subscription) - { - - if (_rejectedBy != null) // We have subscriptions that rejected this message - { - return _rejectedBy.contains(subscription); - } - else // This messasge hasn't been rejected yet. - { - return false; - } - } - - public void requeue(final StoreContext storeContext) throws AMQException - { - getQueue().requeue(storeContext, this); - if (_stateChangeListeners != null) - { - notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); - } - } - - public void dequeue(final StoreContext storeContext) throws FailedDequeueException - { - EntryState state = _state; - - if ((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) - { - if (state instanceof SubscriptionAcquiredState) - { - Subscription s = ((SubscriptionAcquiredState) state).getSubscription(); - s.restoreCredit(this); - } - - _queueEntryList.dequeued(this); - - getQueue().dequeue(storeContext, this); - - if (_stateChangeListeners != null) - { - notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED); - } - } - } - - private void notifyStateChange(final State oldState, final State newState) - { - for (StateChangeListener l : _stateChangeListeners) - { - l.stateChanged(this, oldState, newState); - } - } - - public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException - { - //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d; - if (getQueue() != null) - { - dequeue(storeContext); - } - - delete(); - } - - public boolean isQueueDeleted() - { - return getQueue().isDeleted(); - } - - public void addStateChangeListener(StateChangeListener listener) - { - Set<StateChangeListener> listeners = _stateChangeListeners; - if (listeners == null) - { - _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>()); - listeners = _stateChangeListeners; - } - - listeners.add(listener); - } - - public boolean removeStateChangeListener(StateChangeListener listener) - { - Set<StateChangeListener> listeners = _stateChangeListeners; - if (listeners != null) - { - return listeners.remove(listener); - } - - return false; - } - - public void unload() - { - //Get the currently loaded message - AMQMessage message = _messageRef.get(); - - // If we have a message in memory and we have a valid backingStore attempt to unload - if (message != null && _backingStore != null) - { - try - { - // The backingStore will now handle concurrent calls to unload and safely synchronize to ensure - // multiple initial unloads are unloads - _backingStore.unload(message); - _hasBeenUnloaded = true; - _messageRef.set(null); - - if (_log.isDebugEnabled()) - { - _log.debug("Unloaded:" + debugIdentity()); - } - - - // Clear the message reference if the loaded message is still the one we are processing. - - //Update the memoryState if this load call resulted in the message being purged from memory - if (!_flowed.getAndSet(true)) - { - _queueEntryList.entryUnloadedUpdateMemory(this); - } - - } - catch (UnableToFlowMessageException utfme) - { - // There is no recovery needed as the memory states remain unchanged. - if (_log.isDebugEnabled()) - { - _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage()); - } - } - } - } - - public AMQMessage load() - { - // MessageId and Backing store are null in test scenarios, normally this is not the case. - if (_messageId != null && _backingStore != null) - { - // See if we have the message currently in memory to return - AMQMessage message = _messageRef.get(); - // if we don't then we need to start a load process. - if (message == null) - { - //Synchronize here to ensure only the first thread that attempts to load will perform the load from the - // backing store. - synchronized (this) - { - // Check again to see if someone else ahead of us loaded the message - message = _messageRef.get(); - // if we still don't have the message then we need to start a load process. - if (message == null) - { - // Load the message and keep a reference to it - message = _backingStore.load(_messageId); - // Set the message reference - _messageRef.set(message); - } - else - { - // If someone else loaded the message then we can jump out here as the Memory Updates will - // have been performed by the loading thread - return message; - } - } - - if (_log.isDebugEnabled()) - { - _log.debug("Loaded:" + debugIdentity()); - } - - //Update the memoryState if this load call resulted in the message comming in to memory - if (_flowed.getAndSet(false)) - { - _queueEntryList.entryLoadedUpdateMemory(this); - } - } - - // Return the message that was either already in memory or the value we just loaded. - return message; - } - // This can be null but only in the case where we have no messageId - // in the case where we have no backingStore then we will never have unloaded the message - return _messageRef.get(); - } - - public boolean isFlowed() - { - return _flowed.get(); - } - - public int compareTo(final QueueEntry o) - { - QueueEntryImpl other = (QueueEntryImpl) o; - return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; - } - - public QueueEntryImpl getNext() - { - - QueueEntryImpl next = nextNode(); - while (next != null && next.isDeleted()) - { - - final QueueEntryImpl newNext = next.nextNode(); - if (newNext != null) - { - SimpleQueueEntryList._nextUpdater.compareAndSet(this, next, newNext); - next = nextNode(); - } - else - { - next = null; - } - - } - return next; - } - - QueueEntryImpl nextNode() - { - return _next; - } - - public boolean isDeleted() - { - return _state == DELETED_STATE; - } - - public boolean delete() - { - EntryState state = _state; - - if (state != DELETED_STATE && _stateUpdater.compareAndSet(this, state, DELETED_STATE)) - { - _queueEntryList.advanceHead(); - if (_backingStore != null && _hasBeenUnloaded) - { - _backingStore.delete(_messageId); - } - return true; - } - else - { - return false; - } - } - - public QueueEntryList getQueueEntryList() - { - return _queueEntryList; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java deleted file mode 100644 index c5c115a2d1..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java +++ /dev/null @@ -1,30 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -public interface QueueEntryIterator -{ - boolean atTail(); - - QueueEntry getNode(); - - boolean advance(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java deleted file mode 100644 index 2bbdf610de..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -public interface QueueEntryList -{ - AMQQueue getQueue(); - - QueueEntry add(AMQMessage message); - - QueueEntry next(QueueEntry node); - - QueueEntryIterator iterator(); - - QueueEntry getHead(); - - void setFlowed(boolean flowed); - - boolean isFlowed(); - - int size(); - - long dataSize(); - - long memoryUsed(); - - void setMemoryUsageMaximum(long maximumMemoryUsage); - - long getMemoryUsageMaximum(); - - void setMemoryUsageMinimum(long minimumMemoryUsage); - - long getMemoryUsageMinimum(); - - /** - * Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler. - * @param queueEntry the entry that has been unloaded - */ - void entryUnloadedUpdateMemory(QueueEntry queueEntry); - - /** - * Immediately update memory usage based on the load of this queueEntry - * @param queueEntry the entry that has been loaded - */ - void entryLoadedUpdateMemory(QueueEntry queueEntry); - - void stop(); - - /** - * Mark this queue as part of another QueueEntryList for accounting purposes. - * - * All Calls from the QueueEntry to the QueueEntryList need to check if there is - * a parent QueueEntrylist upon which the action should take place. - * - * @param queueEntryList The parent queue that is performing accounting. - */ - void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java deleted file mode 100644 index 4dbce45f67..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java +++ /dev/null @@ -1,26 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -interface QueueEntryListFactory -{ - public QueueEntryList createQueueEntryList(AMQQueue queue); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java deleted file mode 100644 index 959ca03c80..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java +++ /dev/null @@ -1,27 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-
-public interface QueueNotificationListener
-{
- void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java deleted file mode 100644 index 1210f0e97c..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.Collection; - -public interface QueueRegistry -{ - VirtualHost getVirtualHost(); - - void registerQueue(AMQQueue queue) throws AMQException; - - void unregisterQueue(AMQShortString name) throws AMQException; - - AMQQueue getQueue(AMQShortString name); - - Collection<AMQShortString> getQueueNames(); - - Collection<AMQQueue> getQueues(); - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java deleted file mode 100644 index 6a19acddd7..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ /dev/null @@ -1,1666 +0,0 @@ -package org.apache.qpid.server.queue; - -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import javax.management.JMException; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.pool.ReadWriteRunnable; -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; -import org.apache.qpid.server.transactionlog.TransactionLog; -import org.apache.qpid.server.virtualhost.VirtualHost; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener -{ - private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); - - private final AMQShortString _name; - - /** null means shared */ - private final AMQShortString _owner; - - private final boolean _durable; - - /** If true, this queue is deleted when the last subscriber is removed */ - private final boolean _autoDelete; - - private final VirtualHost _virtualHost; - - /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ - private final ExchangeBindings _bindings = new ExchangeBindings(this); - - private final AtomicBoolean _deleted = new AtomicBoolean(false); - - private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); - - private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); - - protected final SubscriptionList _subscriptionList = new SubscriptionList(this); - private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); - - private volatile Subscription _exclusiveSubscriber; - - protected final QueueEntryList _entries; - - private final AMQQueueMBean _managedObject; - private final Executor _asyncDelivery; - private final AtomicLong _totalMessagesReceived = new AtomicLong(); - - /** max allowed size(KB) of a single message */ - public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); - - /** max allowed number of messages on a queue. */ - public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount(); - - /** max queue depth for the queue */ - public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth(); - - /** maximum message age before alerts occur */ - public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge(); - - /** the minimum interval between sending out consecutive alerts of the same type */ - public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); - - - private static final int MAX_ASYNC_DELIVERIES = 10; - - private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); - - private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); - private AtomicReference _asynchronousRunner = new AtomicReference(null); - private AtomicInteger _deliveredMessages = new AtomicInteger(); - private AtomicBoolean _stopped = new AtomicBoolean(false); - - protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) - throws AMQException - { - this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory()); - } - - protected SimpleAMQQueue(AMQShortString name, - boolean durable, - AMQShortString owner, - boolean autoDelete, - VirtualHost virtualHost, - QueueEntryListFactory entryListFactory) - throws AMQException - { - - if (name == null) - { - throw new IllegalArgumentException("Queue name must not be null"); - } - - if (virtualHost == null) - { - throw new IllegalArgumentException("Virtual Host must not be null"); - } - - _name = name; - _durable = durable; - _owner = owner; - _autoDelete = autoDelete; - _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList(this); - - _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); - - try - { - _managedObject = new AMQQueueMBean(this); - _managedObject.register(); - } - catch (JMException e) - { - throw new AMQException("AMQQueue MBean creation has failed ", e); - } - - resetNotifications(); - } - - public void resetNotifications() - { - // This ensure that the notification checks for the configured alerts are created. - setMaximumMessageAge(_maximumMessageAge); - setMaximumMessageCount(_maximumMessageCount); - setMaximumMessageSize(_maximumMessageSize); - setMaximumQueueDepth(_maximumQueueDepth); - } - - // ------ Getters and Setters - - public AMQShortString getName() - { - return _name; - } - - public boolean isDurable() - { - return _durable; - } - - public boolean isAutoDelete() - { - return _autoDelete; - } - - public boolean isFlowed() - { - return _entries.isFlowed(); - } - - public AMQShortString getOwner() - { - return _owner; - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - // ------ bind and unbind - - public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException - { - exchange.registerQueue(routingKey, this, arguments); - if (isDurable() && exchange.isDurable()) - { - _virtualHost.getRoutingTable().bindQueue(exchange, routingKey, this, arguments); - } - - _bindings.addBinding(routingKey, arguments, exchange); - } - - public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException - { - exchange.deregisterQueue(routingKey, this, arguments); - if (isDurable() && exchange.isDurable()) - { - _virtualHost.getRoutingTable().unbindQueue(exchange, routingKey, this, arguments); - } - - boolean removed = _bindings.remove(routingKey, arguments, exchange); - if (!removed) - { - _logger.error("Mismatch between queue bindings and exchange record of bindings"); - } - } - - public List<ExchangeBinding> getExchangeBindings() - { - return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings()); - } - - // ------ Manage Subscriptions - - public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException - { - - if (isExclusiveSubscriber()) - { - throw new ExistingExclusiveSubscription(); - } - - if (exclusive) - { - if (getConsumerCount() != 0) - { - throw new ExistingSubscriptionPreventsExclusive(); - } - else - { - _exclusiveSubscriber = subscription; - - } - } - - _activeSubscriberCount.incrementAndGet(); - subscription.setStateListener(this); - subscription.setLastSeenEntry(null, _entries.getHead()); - - if (!isDeleted()) - { - subscription.setQueue(this); - _subscriptionList.add(subscription); - if (isDeleted()) - { - subscription.queueDeleted(this); - } - } - else - { - // TODO - } - - deliverAsync(subscription); - - } - - public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException - { - if (subscription == null) - { - throw new NullPointerException("subscription argument is null"); - } - - boolean removed = _subscriptionList.remove(subscription); - - if (removed) - { - subscription.close(); - // No longer can the queue have an exclusive consumer - setExclusiveSubscriber(null); - - QueueEntry lastSeen; - - while ((lastSeen = subscription.getLastSeenEntry()) != null) - { - subscription.setLastSeenEntry(lastSeen, null); - } - - // auto-delete queues must be deleted if there are no remaining subscribers - - if (_autoDelete && getConsumerCount() == 0) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Auto-deleteing queue:" + this); - } - - delete(); - - // we need to manually fire the event to the removed subscription (which was the last one left for this - // queue. This is because the delete method uses the subscription set which has just been cleared - subscription.queueDeleted(this); - } - } - - } - - // ------ Enqueue / Dequeue - - public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException - { - _totalMessagesReceived.incrementAndGet(); - - QueueEntry entry; - Subscription exclusiveSub = _exclusiveSubscriber; - - if (exclusiveSub != null) - { - exclusiveSub.getSendLock(); - - try - { - entry = _entries.add(message); - - deliverToSubscription(exclusiveSub, entry); - - // where there is more than one producer there's a reasonable chance that even though there is - // no "queueing" we do not deliver because we get an interleving of _entries.add and - // deliverToSubscription between threads. Therefore have one more try. - if (!(entry.isAcquired() || entry.isDeleted())) - { - deliverToSubscription(exclusiveSub, entry); - } - } - finally - { - exclusiveSub.releaseSendLock(); - } - } - else - { - entry = _entries.add(message); - /* - - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - - */ - SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); - SubscriptionList.SubscriptionNode nextNode = node.getNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } - while (nextNode != null) - { - if (_lastSubscriptionNode.compareAndSet(node, nextNode)) - { - break; - } - else - { - node = _lastSubscriptionNode.get(); - nextNode = node.getNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } - } - } - - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; - - while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) - { - if (nextNode == null) - { - loops--; - nextNode = _subscriptionList.getHead(); - } - else - { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); - } - nextNode = nextNode.getNext(); - - } - } - - if (entry.immediateAndNotDelivered()) - { - //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content - // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks - // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses. - entry.acquire(); - entry.dequeueAndDelete(storeContext); - } - else if (!(entry.isAcquired() || entry.isDeleted())) - { - checkSubscriptionsNotAheadOfDelivery(entry); - - deliverAsync(); - } - - _managedObject.checkForNotification(entry); - - return entry; - } - - private void deliverToSubscription(final Subscription sub, final QueueEntry entry) - throws AMQException - { - - sub.getSendLock(); - try - { - if (subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended()) - { - if (!sub.wouldSuspend(entry)) - { - if (!sub.isBrowser() && !entry.acquire(sub)) - { - // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription - sub.restoreCredit(entry); - } - else - { - - deliverMessage(sub, entry); - - } - } - } - } - finally - { - sub.releaseSendLock(); - } - } - - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) - { - // This method is only required for queues which mess with ordering - // Simple Queues don't :-) - } - - private void deliverMessage(final Subscription sub, final QueueEntry entry) - throws AMQException - { - _deliveredMessages.incrementAndGet(); - - sub.send(entry); - - } - - private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) - { - - // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no - // interest in. - QueueEntry node = sub.getLastSeenEntry(); - while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node))) - { - - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } - else - { - node = null; - break; - } - - } - - if (node == entry) - { - // If the first entry that subscription can process is the one we are trying to deliver to it, then we are - // good - return true; - } - else - { - // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing - // no-one else has updated it to something furhter on in the list - //TODO - check - //updateLastSeenEntry(sub, entry); - return false; - } - - } - - private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry) - { - QueueEntry node = sub.getLastSeenEntry(); - - if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry)) - { - do - { - if (sub.setLastSeenEntry(node, entry)) - { - return; - } - else - { - node = sub.getLastSeenEntry(); - } - } - while (node != null && entry.compareTo(node) < 0); - } - - } - - public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException - { - - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards - while (subscriberIter.advance()) - { - Subscription sub = subscriberIter.getNode().getSubscription(); - - // we don't make browsers send the same stuff twice - if (!sub.isBrowser()) - { - updateLastSeenEntry(sub, entry); - } - } - - deliverAsync(); - - } - - /** - * Only call from queue Entry - * @param storeContext - * @param entry - * @throws FailedDequeueException - */ - public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException - { - if (entry.acquiredBySubscription()) - { - _deliveredMessages.decrementAndGet(); - } - - try - { - if (entry.isPersistent()) - { - _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, entry.getMessageId()); - } - - } - catch (MessageCleanupException e) - { - // Message was dequeued, but could not then be deleted - // though it is no longer referenced. This should be very - // rare and can be detected and cleaned up on recovery or - // done through some form of manual intervention. - _logger.error(e, e); - } - catch (AMQException e) - { - throw new FailedDequeueException(_name.toString(), e); - } - - } - - - public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException - { - /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message - entry to resend and move back the subscription pointer. */ - - subscription.getSendLock(); - try - { - if (!subscription.isClosed()) - { - deliverMessage(subscription, entry); - return true; - } - else - { - return false; - } - } - finally - { - subscription.releaseSendLock(); - } - } - - public int getConsumerCount() - { - return _subscriptionList.size(); - } - - public int getActiveConsumerCount() - { - return _activeSubscriberCount.get(); - } - - public boolean isUnused() - { - return getConsumerCount() == 0; - } - - public boolean isEmpty() - { - return getMessageCount() == 0; - } - - public long getMemoryUsageCurrent() - { - return getQueueInMemory(); - } - - public int getMessageCount() - { - return getQueueCount(); - } - - public long getQueueDepth() - { - return getQueueSize(); - } - - public int getUndeliveredMessageCount() - { - int count = getMessageCount() - _deliveredMessages.get(); - if (count < 0) - { - return 0; - } - else - { - return count; - } - } - - public long getReceivedMessageCount() - { - return _totalMessagesReceived.get(); - } - - public long getOldestMessageArrivalTime() - { - QueueEntry entry = getOldestQueueEntry(); - return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); - } - - protected QueueEntry getOldestQueueEntry() - { - return _entries.next(_entries.getHead()); - } - - public boolean isDeleted() - { - return _deleted.get(); - } - - public List<QueueEntry> getMessagesOnTheQueue() - { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (node != null && !node.isDeleted()) - { - entryList.add(node); - } - } - return entryList; - - } - - public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) - { - if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) - { - _activeSubscriberCount.decrementAndGet(); - - } - else if (newState == Subscription.State.ACTIVE) - { - if (oldState != Subscription.State.ACTIVE) - { - _activeSubscriberCount.incrementAndGet(); - - } - deliverAsync(sub); - } - } - - public int compareTo(final AMQQueue o) - { - return _name.compareTo(o.getName()); - } - - public int getQueueCount() - { - return _entries.size(); - } - - public long getQueueSize() - { - return _entries.dataSize(); - } - - public long getQueueInMemory() - { - return _entries.memoryUsed(); - } - - private boolean isExclusiveSubscriber() - { - return _exclusiveSubscriber != null; - } - - private void setExclusiveSubscriber(Subscription exclusiveSubscriber) - { - _exclusiveSubscriber = exclusiveSubscriber; - } - - public static interface QueueEntryFilter - { - public boolean accept(QueueEntry entry); - - public boolean filterComplete(); - } - - public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) - { - return getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessageId(); - return messageId >= fromMessageId && messageId <= toMessageId; - } - - public boolean filterComplete() - { - return false; - } - }); - } - - public QueueEntry getMessageOnTheQueue(final long messageId) - { - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - private boolean _complete; - - public boolean accept(QueueEntry entry) - { - _complete = entry.getMessageId() == messageId; - return _complete; - } - - public boolean filterComplete() - { - return _complete; - } - }); - return entries.isEmpty() ? null : entries.get(0); - } - - public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter) - { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); - while (queueListIterator.advance() && !filter.filterComplete()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && filter.accept(node)) - { - entryList.add(node); - } - } - return entryList; - - } - - - public void moveMessagesToAnotherQueue(final long fromMessageId, - final long toMessageId, - String queueName, - StoreContext storeContext) - { - // The move is a two step process. First the messages are moved in the _transactionLog. - // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the - // existing queue. - // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery - // this is done as the message is recieved. - // So The final step is to enqueue the messages on the new queue. - - AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - TransactionLog transactionLog = getVirtualHost().getTransactionLog(); - - if (toQueue.equals(this)) - { - //nothing to do here, message is already at the requested destination - return; - } - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessageId(); - return (messageId >= fromMessageId) - && (messageId <= toMessageId) - && entry.acquire(); - } - - public boolean filterComplete() - { - return false; - } - }); - - try - { - transactionLog.beginTran(storeContext); - - // Move the messages in the transaction log. - for (QueueEntry entry : entries) - { - if (entry.isPersistent()) - { - //FIXME - //fixme - - // Creating a list with the destination queue AND the current queue. - // This is a hack to ensure a reference is kept in the TLog to the new destination when dequeing - // the old destination below, thus preventing incorrect removal of the message from the store - ArrayList<AMQQueue> list = new ArrayList<AMQQueue>(); - list.add(toQueue); - list.add(this); - transactionLog.enqueueMessage(storeContext, list, entry.getMessageId()); - } - // dequeue will remove the messages from the queue - entry.dequeue(storeContext); - } - - // Commit and flush the move transactions. - try - { - transactionLog.commitTran(storeContext); - } - catch (AMQException e) - { - throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e); - } - } - catch (AMQException e) - { - try - { - transactionLog.abortTran(storeContext); - } - catch (AMQException rollbackEx) - { - _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx); - } - throw new RuntimeException(e); - } - - try - { - // Add messages to new queue - for (QueueEntry entry : entries) - { - toQueue.enqueue(storeContext, entry.getMessage()); - // As we only did a dequeue above now that we have moved the message we should perform a delete. - // We cannot do this earlier as the message will be lost if flowed. - entry.delete(); - } - } - catch (MessageCleanupException e) - { - throw new RuntimeException(e); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - - public void copyMessagesToAnotherQueue(final long fromMessageId, - final long toMessageId, - String queueName, - final StoreContext storeContext) - { - AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - TransactionLog transactionLog = getVirtualHost().getTransactionLog(); - - if (toQueue.equals(this)) - { - //nothing to do here, message is already at the requested destination - return; - } - - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(QueueEntry entry) - { - final long messageId = entry.getMessageId(); - if ((messageId >= fromMessageId) - && (messageId <= toMessageId)) - { - if (!entry.isDeleted()) - { - return true; - } - } - - return false; - } - - public boolean filterComplete() - { - return false; - } - }); - - try - { - transactionLog.beginTran(storeContext); - - // Move the messages in on the transaction log. - for (QueueEntry entry : entries) - { - if (!entry.isDeleted() && entry.isPersistent()) - { - //fixme - //FIXME - - // Creating a list with the destination queue AND the current queue. - // This is a hack to ensure a reference is kept in the TLog to the old destination when enqueing - ArrayList list = new ArrayList(); - list.add(this); - list.add(toQueue); - transactionLog.enqueueMessage(storeContext, list, entry.getMessageId()); - } - } - - - // Commit and flush the move transcations. - try - { - transactionLog.commitTran(storeContext); - } - catch (AMQException e) - { - throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e); - } - } - catch (AMQException e) - { - try - { - transactionLog.abortTran(storeContext); - } - catch (AMQException rollbackEx) - { - _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx); - } - throw new RuntimeException(e); - } - - try - { - for (QueueEntry entry : entries) - { - if (!entry.isDeleted()) - { - toQueue.enqueue(storeContext, entry.getMessage()); - } - } - } - catch (MessageCleanupException e) - { - throw new RuntimeException(e); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - - public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext) - { - - try - { - QueueEntryIterator queueListIterator = _entries.iterator(); - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - - final long messageId = node.getMessageId(); - - if ((messageId >= fromMessageId) - && (messageId <= toMessageId) - && !node.isDeleted() - && node.acquire()) - { - node.dequeueAndDelete(storeContext); - } - - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - - // ------ Management functions - - public void deleteMessageFromTop(StoreContext storeContext) throws AMQException - { - QueueEntryIterator queueListIterator = _entries.iterator(); - boolean noDeletes = true; - - while (noDeletes && queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) - { - node.dequeueAndDelete(storeContext); - noDeletes = false; - } - - } - } - - public long clearQueue(StoreContext storeContext) throws AMQException - { - - QueueEntryIterator queueListIterator = _entries.iterator(); - long count = 0; - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) - { - node.dequeueAndDelete(storeContext); - count++; - } - - } - return count; - - } - - public void addQueueDeleteTask(final Task task) - { - _deleteTaskList.add(task); - } - - public int delete() throws AMQException - { - if (!_deleted.getAndSet(true)) - { - - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); - - while (subscriptionIter.advance()) - { - Subscription s = subscriptionIter.getNode().getSubscription(); - if (s != null) - { - s.queueDeleted(this); - } - } - - _bindings.deregister(); - _virtualHost.getQueueRegistry().unregisterQueue(_name); - - _managedObject.unregister(); - for (Task task : _deleteTaskList) - { - task.doTask(this); - } - - _deleteTaskList.clear(); - stop(); - } - return getMessageCount(); - - } - - public void stop() - { - if (!_stopped.getAndSet(true)) - { - ReferenceCountingExecutorService.getInstance().releaseExecutorService(); - _entries.stop(); - } - } - - public void deliverAsync() - { - _stateChangeCount.incrementAndGet(); - - Runner runner = new Runner(); - - if (_asynchronousRunner.compareAndSet(null, runner)) - { - _asyncDelivery.execute(runner); - } - } - - public void deliverAsync(Subscription sub) - { - _asyncDelivery.execute(new SubFlushRunner(sub)); - } - - private class Runner implements ReadWriteRunnable - { - public void run() - { - try - { - processQueue(this); - } - catch (AMQException e) - { - _logger.error(e); - } - - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - } - - private class SubFlushRunner implements ReadWriteRunnable - { - private final Subscription _sub; - - public SubFlushRunner(Subscription sub) - { - _sub = sub; - } - - public void run() - { - boolean complete = false; - try - { - complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); - - } - catch (AMQException e) - { - _logger.error(e); - } - if (!complete && !_sub.isSuspended()) - { - _asyncDelivery.execute(this); - } - - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - } - - public void flushSubscription(Subscription sub) throws AMQException - { - flushSubscription(sub, Long.MAX_VALUE); - } - - public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException - { - boolean atTail = false; - - while (!sub.isSuspended() && !atTail && iterations != 0) - { - try - { - sub.getSendLock(); - atTail = attemptDelivery(sub); - if (atTail && sub.isAutoClose()) - { - unregisterSubscription(sub); - - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); - } - else if (!atTail) - { - iterations--; - } - } - finally - { - sub.releaseSendLock(); - } - } - - // if there's (potentially) more than one subscription the others will potentially not have been advanced to the - // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc - // which would give us memory "leak". - - if (!isExclusiveSubscriber()) - { - advanceAllSubscriptions(); - } - return atTail; - } - - private boolean attemptDelivery(Subscription sub) throws AMQException - { - boolean atTail = false; - boolean advanced = false; - boolean subActive = sub.isActive(); - if (subActive) - { - QueueEntry node = moveSubscriptionToNextNode(sub); - if (!(node.isAcquired() || node.isDeleted())) - { - if (!sub.isSuspended()) - { - if (sub.hasInterest(node)) - { - if (!sub.wouldSuspend(node)) - { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - } - else - { - deliverMessage(sub, node); - - if (sub.isBrowser()) - { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - advanced = true; - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } - } - } - - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - node.addStateChangeListener(new QueueEntryListener(sub, node)); - } - } - else - { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } - } - } - - } - atTail = (_entries.next(node) == null) && !advanced; - } - return atTail || !subActive; - } - - protected void advanceAllSubscriptions() throws AMQException - { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - while (subscriberIter.advance()) - { - SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); - Subscription sub = subNode.getSubscription(); - moveSubscriptionToNextNode(sub); - } - } - - private QueueEntry moveSubscriptionToNextNode(final Subscription sub) - throws AMQException - { - QueueEntry node = sub.getLastSeenEntry(); - - while (node != null && (node.isAcquired() || node.isDeleted() || node.expired())) - { - if (!node.isAcquired() && !node.isDeleted() && node.expired()) - { - if (node.acquire()) - { - // creating a new final store context per message seems wasteful. - final StoreContext reapingStoreContext = new StoreContext(); - node.dequeueAndDelete(reapingStoreContext); - } - } - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } - else - { - break; - } - - } - return node; - } - - private void processQueue(Runnable runner) throws AMQException - { - long stateChangeCount; - long previousStateChangeCount = Long.MIN_VALUE; - boolean deliveryIncomplete = true; - - int extraLoops = 1; - Long iterations = new Long(MAX_ASYNC_DELIVERIES); - - _asynchronousRunner.compareAndSet(runner, null); - - while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) - { - // we want to have one extra loop after every subscription has reached the point where it cannot move - // further, just in case the advance of one subscription in the last loop allows a different subscription to - // move forward in the next iteration - - if (previousStateChangeCount != stateChangeCount) - { - extraLoops = 1; - } - - previousStateChangeCount = stateChangeCount; - deliveryIncomplete = _subscriptionList.size() != 0; - boolean done = true; - - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); - //iterate over the subscribers and try to advance their pointer - while (subscriptionIter.advance()) - { - boolean closeConsumer = false; - Subscription sub = subscriptionIter.getNode().getSubscription(); - sub.getSendLock(); - try - { - if (sub != null) - { - - QueueEntry node = moveSubscriptionToNextNode(sub); - if (node != null) - { - done = attemptDelivery(sub); - } - } - if (done) - { - if (extraLoops == 0) - { - deliveryIncomplete = false; - if (sub.isAutoClose()) - { - unregisterSubscription(sub); - - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); - } - } - else - { - extraLoops--; - } - } - else - { - iterations--; - extraLoops = 1; - } - } - finally - { - sub.releaseSendLock(); - } - } - _asynchronousRunner.set(null); - } - - // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit - // therefore we should schedule this runner again (unless someone beats us to it :-) ). - if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) - { - _asyncDelivery.execute(runner); - } - } - - - public void checkMessageStatus() throws AMQException - { - - final StoreContext storeContext = new StoreContext(); - - QueueEntryIterator queueListIterator = _entries.iterator(); - - while (queueListIterator.advance()) - { - QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.expired() && node.acquire()) - { - node.dequeueAndDelete(storeContext); - } - else - { - _managedObject.checkForNotification(node); - } - } - - } - - - public long getMemoryUsageMaximum() - { - return _entries.getMemoryUsageMaximum(); - } - - public void setMemoryUsageMaximum(long maximumMemoryUsage) - { - _entries.setMemoryUsageMaximum(maximumMemoryUsage); - } - - public long getMemoryUsageMinimum() - { - return _entries.getMemoryUsageMinimum(); - } - - public void setMemoryUsageMinimum(long minimumMemoryUsage) - { - _entries.setMemoryUsageMinimum(minimumMemoryUsage); - } - - public long getMinimumAlertRepeatGap() - { - return _minimumAlertRepeatGap; - } - - public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) - { - _minimumAlertRepeatGap = minimumAlertRepeatGap; - } - - public long getMaximumMessageAge() - { - return _maximumMessageAge; - } - - public void setMaximumMessageAge(long maximumMessageAge) - { - _maximumMessageAge = maximumMessageAge; - if (maximumMessageAge == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); - } - } - - public long getMaximumMessageCount() - { - return _maximumMessageCount; - } - - public void setMaximumMessageCount(final long maximumMessageCount) - { - _maximumMessageCount = maximumMessageCount; - if (maximumMessageCount == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); - } - - } - - public long getMaximumQueueDepth() - { - return _maximumQueueDepth; - } - - // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(final long maximumQueueDepth) - { - _maximumQueueDepth = maximumQueueDepth; - if (maximumQueueDepth == 0L) - { - _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); - } - - } - - public long getMaximumMessageSize() - { - return _maximumMessageSize; - } - - public void setMaximumMessageSize(final long maximumMessageSize) - { - _maximumMessageSize = maximumMessageSize; - if (maximumMessageSize == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); - } - } - - public Set<NotificationCheck> getNotificationChecks() - { - return _notificationChecks; - } - - public ManagedObject getManagedObject() - { - return _managedObject; - } - - private final class QueueEntryListener implements QueueEntry.StateChangeListener - { - private final QueueEntry _entry; - private final Subscription _sub; - - public QueueEntryListener(final Subscription sub, final QueueEntry entry) - { - _entry = entry; - _sub = sub; - } - - public boolean equals(Object o) - { - return _entry == ((QueueEntryListener) o)._entry && _sub == ((QueueEntryListener) o)._sub; - } - - public int hashCode() - { - return System.identityHashCode(_entry) ^ System.identityHashCode(_sub); - } - - public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) - { - entry.removeStateChangeListener(this); - deliverAsync(_sub); - } - } - - public List<Long> getMessagesOnTheQueue(int num) - { - return getMessagesOnTheQueue(num, 0); - } - - public List<Long> getMessagesOnTheQueue(int num, int offset) - { - ArrayList<Long> ids = new ArrayList<Long>(num); - QueueEntryIterator it = _entries.iterator(); - for (int i = 0; i < offset; i++) - { - it.advance(); - } - - for (int i = 0; i < num && !it.atTail(); i++) - { - it.advance(); - ids.add(it.getNode().getMessageId()); - } - return ids; - } - - - public String getType() - { - return getClass().getSimpleName() + "[" + getName() +"]"; - } - - public String toString() - { - return getType() + "[Owner:" + _owner + "][Durable:" + _durable + "]"; - } - - public void configure(QueueConfiguration config) - { - if (config != null) - { - setMaximumMessageAge(config.getMaximumMessageAge()); - setMaximumQueueDepth(config.getMaximumQueueDepth()); - setMaximumMessageSize(config.getMaximumMessageSize()); - setMaximumMessageCount(config.getMaximumMessageCount()); - setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); - setMemoryUsageMaximum(config.getMemoryUsageMaximum()); - setMemoryUsageMinimum(config.getMemoryUsageMinimum()); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java deleted file mode 100644 index a10e332ef5..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ /dev/null @@ -1,181 +0,0 @@ -package org.apache.qpid.server.queue; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public class SimpleQueueEntryList extends FlowableBaseQueueEntryList -{ - - private final QueueEntryImpl _head; - - private volatile QueueEntryImpl _tail; - - static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl> - _tailUpdater = - AtomicReferenceFieldUpdater.newUpdater - (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail"); - - - private final AMQQueue _queue; - - static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl> - _nextUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); - - public SimpleQueueEntryList(AMQQueue queue) - { - super(queue); - _queue = queue; - _head = new QueueEntryImpl(this); - _tail = _head; - } - - void advanceHead() - { - QueueEntryImpl head = _head.nextNode(); - while(head._next != null && head.isDeleted()) - { - - final QueueEntryImpl newhead = head.nextNode(); - if(newhead != null) - { - _nextUpdater.compareAndSet(_head,head, newhead); - } - head = _head.nextNode(); - } - } - - - public AMQQueue getQueue() - { - return _queue; - } - - - public QueueEntry add(AMQMessage message) - { - QueueEntryImpl node = new QueueEntryImpl(this, message); - - incrementCounters(node); - - for (;;) - { - QueueEntryImpl tail = _tail; - QueueEntryImpl next = tail.nextNode(); - if (tail == _tail) - { - if (next == null) - { - node.setEntryId(tail.getEntryId()+1); - if (_nextUpdater.compareAndSet(tail, null, node)) - { - _tailUpdater.compareAndSet(this, tail, node); - - return node; - } - } - else - { - _tailUpdater.compareAndSet(this,tail, next); - } - } - } - } - - - public QueueEntry next(QueueEntry node) - { - return ((QueueEntryImpl)node).getNext(); - } - - public class QueueEntryIteratorImpl implements QueueEntryIterator - { - - private QueueEntryImpl _lastNode; - - QueueEntryIteratorImpl(QueueEntryImpl startNode) - { - _lastNode = startNode; - } - - - public boolean atTail() - { - return _lastNode.nextNode() == null; - } - - public QueueEntry getNode() - { - - return _lastNode; - - } - - public boolean advance() - { - - if(!atTail()) - { - QueueEntryImpl nextNode = _lastNode.nextNode(); - while(nextNode.isDeleted() && nextNode.nextNode() != null) - { - nextNode = nextNode.nextNode(); - } - _lastNode = nextNode; - return true; - - } - else - { - return false; - } - - } - - } - - - public QueueEntryIterator iterator() - { - return new QueueEntryIteratorImpl(_head); - } - - - public QueueEntry getHead() - { - return _head; - } - - static class Factory implements QueueEntryListFactory - { - - public QueueEntryList createQueueEntryList(AMQQueue queue) - { - return new SimpleQueueEntryList(queue); - } - - } - - - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java deleted file mode 100644 index 4c9fe81439..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.StoreContext; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -/** A deliverable message. */ -public class TransientAMQMessage implements AMQMessage -{ - /** Used for debugging purposes. */ - protected static final Logger _log = Logger.getLogger(AMQMessage.class); - - protected ContentHeaderBody _contentHeaderBody; - - protected MessagePublishInfo _messagePublishInfo; - - protected List<ContentChunk> _contentBodies; - - protected long _arrivalTime; - - protected final Long _messageId; - - - private byte _flags = 0; - - private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; - private long _expiration; - - /** - * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory - * therefore is memory-efficient. - */ - private class BodyFrameIterator implements Iterator<AMQDataBlock> - { - private int _channel; - - private int _index = -1; - private AMQProtocolSession _protocolSession; - - private BodyFrameIterator(AMQProtocolSession protocolSession, int channel) - { - _channel = channel; - _protocolSession = protocolSession; - } - - public boolean hasNext() - { - return _index < (getBodyCount() - 1); - } - - public AMQDataBlock next() - { - AMQBody cb = - getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index)); - - return new AMQFrame(_channel, cb); - } - - private ProtocolVersionMethodConverter getProtocolVersionMethodConverter() - { - return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - private class BodyContentIterator implements Iterator<ContentChunk> - { - - private int _index = -1; - - public boolean hasNext() - { - return _index < (getBodyCount() - 1); - } - - public ContentChunk next() - { - return getContentChunk(++_index); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - /** - * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message - * These all need refactoring to some sort of MockAMQMessageFactory. - */ - @Deprecated - protected TransientAMQMessage(AMQMessage message) throws AMQException - { - _messageId = message.getMessageId(); - _flags = ((TransientAMQMessage) message)._flags; - _contentHeaderBody = message.getContentHeaderBody(); - _messagePublishInfo = message.getMessagePublishInfo(); - } - - /** - * Normal message creation via the MessageFactory uses this constructor - * Package scope limited as MessageFactory should be used - * - * @param messageId - * - * @see MessageFactory - */ - TransientAMQMessage(Long messageId) - { - _messageId = messageId; - } - - public String debugIdentity() - { - return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() +")"; - } - - public void setExpiration(long expiration) - { - _expiration = expiration; - } - - public long getExpiration() - { - return _expiration; - } - - public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) - { - return new BodyFrameIterator(protocolSession, channel); - } - - public Iterator<ContentChunk> getContentBodyIterator() - { - return new BodyContentIterator(); - } - - public ContentHeaderBody getContentHeaderBody() - { - return _contentHeaderBody; - } - - public Long getMessageId() - { - return _messageId; - } - - - public long getSize() - { - return _contentHeaderBody.bodySize; - } - - public Object getPublisherClientInstance() - { - return _sessionIdentifier.getSessionInstance(); - } - - public Object getPublisherIdentifier() - { - return _sessionIdentifier.getSessionIdentifier(); - } - - public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier) - { - _sessionIdentifier = sessionIdentifier; - } - - /** From AMQMessageHandle * */ - - public int getBodyCount() - { - return _contentBodies.size(); - } - - public ContentChunk getContentChunk(int index) - { - if (_contentBodies == null) - { - throw new RuntimeException("No ContentBody has been set"); - } - - if (index > _contentBodies.size() - 1 || index < 0) - { - throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + - (_contentBodies.size() - 1)); - } - return _contentBodies.get(index); - } - - public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) - throws AMQException - { - if (_contentBodies == null) - { - if (isLastContentBody) - { - _contentBodies = Collections.singletonList(contentChunk); - } - else - { - _contentBodies = new ArrayList<ContentChunk>(); - _contentBodies.add(contentChunk); - } - } - else - { - _contentBodies.add(contentChunk); - } - } - - public MessagePublishInfo getMessagePublishInfo() - { - return _messagePublishInfo; - } - - public boolean isPersistent() - { - return false; - } - - public boolean isImmediate() - { - return _messagePublishInfo.isImmediate(); - } - - /** - * This is called when all the content has been received. - * - * @param storeContext - * @param messagePublishInfo - * @param contentHeaderBody @throws AMQException - */ - public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, - ContentHeaderBody contentHeaderBody) - throws AMQException - { - - if (contentHeaderBody == null) - { - throw new NullPointerException("HeaderBody cannot be null"); - } - - if (messagePublishInfo == null) - { - throw new NullPointerException("PublishInfo cannot be null"); - } - - _arrivalTime = System.currentTimeMillis(); - - - _contentHeaderBody = contentHeaderBody; - _messagePublishInfo = messagePublishInfo; - - updateHeaderAndFlags(); - } - - public long getArrivalTime() - { - return _arrivalTime; - } - - public void recoverFromMessageMetaData(MessageMetaData mmd) - { - _arrivalTime = mmd.getArrivalTime(); - _contentHeaderBody = mmd.getContentHeaderBody(); - _messagePublishInfo = mmd.getMessagePublishInfo(); - - updateHeaderAndFlags(); - } - - private void updateHeaderAndFlags() - { - if (_contentHeaderBody.bodySize == 0) - { - _contentBodies = Collections.EMPTY_LIST; - } - } - - public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException - { - addContentBodyFrame(null, contentChunk, isLastContentBody); - } - - - public String toString() - { - // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + - // _taken + " by :" + _takenBySubcription; - - return "Message[" + debugIdentity() + "]: " + getMessageId() ; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java deleted file mode 100644 index b09283b11f..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.LinkedList; -import java.util.List; -import java.util.ArrayList; -import java.util.Collections; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; - -/** - * Contains data that is only used in AMQMessage transiently, e.g. while the content - * body fragments are arriving. - * - * Having this data stored in a separate class means that the AMQMessage class avoids - * the small overhead of numerous guaranteed-null references. - * - * @author Apache Software Foundation - */ -public class TransientMessageData -{ - /** - * Stored temporarily until the header has been received at which point it is used when - * constructing the handle - */ - private MessagePublishInfo _messagePublishInfo; - - /** - * Also stored temporarily. - */ - private ContentHeaderBody _contentHeaderBody; - - /** - * Keeps a track of how many bytes we have received in body frames - */ - private long _bodyLengthReceived = 0; - - /** - * This is stored during routing, to know the queues to which this message should immediately be - * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done - * by the message handle. - */ - private List<AMQQueue> _destinationQueues; - - public MessagePublishInfo getMessagePublishInfo() - { - return _messagePublishInfo; - } - - public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo) - { - _messagePublishInfo = messagePublishInfo; - } - - public List<AMQQueue> getDestinationQueues() - { - return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues; - } - - public void setDestinationQueues(List<AMQQueue> destinationQueues) - { - _destinationQueues = destinationQueues; - } - - public ContentHeaderBody getContentHeaderBody() - { - return _contentHeaderBody; - } - - public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) - { - _contentHeaderBody = contentHeaderBody; - } - - public long getBodyLengthReceived() - { - return _bodyLengthReceived; - } - - public void addBodyLength(int value) - { - _bodyLengthReceived += value; - } - - public boolean isAllContentReceived() throws AMQException - { - return _bodyLengthReceived == _contentHeaderBody.bodySize; - } - - public void addDestinationQueue(AMQQueue queue) - { - if(_destinationQueues == null) - { - _destinationQueues = new ArrayList<AMQQueue>(); - } - _destinationQueues.add(queue); - } - - public boolean isPersistent() - { - return _contentHeaderBody.properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == - BasicContentHeaderProperties.PERSISTENT; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java deleted file mode 100644 index 03cfed8533..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -public class UnableToFlowMessageException extends Exception -{ - public UnableToFlowMessageException(long messageId, Exception error) - { - super("Unable to Flow Message:"+messageId, error); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java deleted file mode 100644 index cae5bc6327..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -public class UnableToRecoverMessageException extends RuntimeException -{ - public UnableToRecoverMessageException(Exception error) - { - super(error); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java deleted file mode 100644 index 295cb266b9..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.RequiredDeliveryException; - -/** - * UnauthorizedAccessException is a {@link RequiredDeliveryException} that represents the failure case where a message - * is published with a user id different from the one used when creating the connection . - * The AMQP status code, 403, is always used to report this condition. - * - */ - -public class UnauthorizedAccessException extends RequiredDeliveryException -{ - public UnauthorizedAccessException(String msg, AMQMessage amqMessage) - { - super(msg, amqMessage); - } - - public AMQConstant getReplyCode() - { - return AMQConstant.ACCESS_REFUSED; - } -} |