summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-14 15:46:39 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-14 15:46:39 +0000
commit1b2b4b309e9392e0523cd62accb8704fd089eef8 (patch)
tree81800138f1c90a88d004b7fad90a69a902ab44dd /java/broker/src/main/java/org/apache/qpid/server/queue
parent86f2fa80575d0db1fe1395172f6dccf23c7d5018 (diff)
downloadqpid-python-1b2b4b309e9392e0523cd62accb8704fd089eef8.tar.gz
QPID-1807 : Remove old broker and FlowToDisk related tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@764838 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java99
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java77
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java220
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java140
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java494
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java71
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java84
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java82
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java50
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java385
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java194
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java33
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java550
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java301
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java302
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java119
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java92
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java129
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java73
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java178
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java64
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java233
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java568
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java76
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java27
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java43
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java1666
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java181
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java332
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java127
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java45
38 files changed, 0 insertions, 7280 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
deleted file mode 100644
index 8dac12fe24..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.AMQException;
-
-import java.util.Iterator;
-
-public interface AMQMessage
-{
- //Get Content relating to this message
-
- Long getMessageId();
-
- Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel);
-
- Iterator<ContentChunk> getContentBodyIterator();
-
- ContentHeaderBody getContentHeaderBody();
-
- ContentChunk getContentChunk(int index);
-
- Object getPublisherClientInstance();
-
- Object getPublisherIdentifier();
-
- MessagePublishInfo getMessagePublishInfo();
-
- int getBodyCount();
-
- long getSize();
-
- long getArrivalTime();
-
-
-
- //Check the status of this message
-
- /** Is this a persistent message
- *
- * @return true if the message is persistent
- */
- boolean isPersistent();
-
-
- boolean isImmediate();
-
-
- void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier);
-
- /**
- * This is called when all the content has been received.
- * @param storeContext
- *@param messagePublishInfo
- * @param contentHeaderBody @throws org.apache.qpid.AMQException
- */
- void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody)
- throws AMQException;
-
- void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
- throws AMQException;
-
- void recoverFromMessageMetaData(MessageMetaData mmd);
-
- void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException;
-
-
- String toString();
-
- String debugIdentity();
-
- void setExpiration(long expiration);
-
- long getExpiration();
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
deleted file mode 100644
index 00dec57ed5..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.subscription.SubscriptionList;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.AMQException;
-
-public class AMQPriorityQueue extends SimpleAMQQueue
-{
- protected AMQPriorityQueue(final AMQShortString name,
- final boolean durable,
- final AMQShortString owner,
- final boolean autoDelete,
- final VirtualHost virtualHost,
- int priorities)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueEntryList.Factory(priorities));
- }
-
- public int getPriorities()
- {
- return ((PriorityQueueEntryList) _entries).getPriorities();
- }
-
- @Override
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
- {
- // check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && !entry.isAcquired())
- {
- final Subscription subscription = subIter.getNode().getSubscription();
- QueueEntry subnode = subscription.getLastSeenEntry();
- while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired())
- {
- if(subscription.setLastSeenEntry(subnode,entry))
- {
- break;
- }
- else
- {
- subnode = subscription.getLastSeenEntry();
- }
- }
-
- }
- }
-
- @Override
- public String getType()
- {
- return getClass().getSimpleName() + "[" + getName() + "][Priorities:" + getPriorities() + "]";
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
deleted file mode 100644
index fae219e320..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import java.util.List;
-import java.util.Set;
-
-public interface AMQQueue extends Managable, Comparable<AMQQueue>
-{
-
- AMQShortString getName();
-
- boolean isDurable();
-
- boolean isAutoDelete();
-
- AMQShortString getOwner();
-
- VirtualHost getVirtualHost();
-
-
- void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
-
- void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
-
- List<ExchangeBinding> getExchangeBindings();
-
-
- void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
-
- void unregisterSubscription(final Subscription subscription) throws AMQException;
-
-
- int getConsumerCount();
-
- int getActiveConsumerCount();
-
- boolean isUnused();
-
- boolean isEmpty();
-
- boolean isFlowed();
-
- int getMessageCount();
-
- int getUndeliveredMessageCount();
-
-
- long getQueueDepth();
-
- long getReceivedMessageCount();
-
- long getOldestMessageArrivalTime();
-
- boolean isDeleted();
-
- int delete() throws AMQException;
-
- QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
-
- void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
-
- void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
-
- boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
- void addQueueDeleteTask(final Task task);
-
- List<QueueEntry> getMessagesOnTheQueue();
-
- List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
-
- List<Long> getMessagesOnTheQueue(int num);
-
- List<Long> getMessagesOnTheQueue(int num, int offest);
-
- QueueEntry getMessageOnTheQueue(long messageId);
-
-
- void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext);
-
- void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext);
-
- void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext);
-
- long getMemoryUsageMaximum();
-
- void setMemoryUsageMaximum(long maximumMemoryUsage);
-
- long getMemoryUsageMinimum();
-
- void setMemoryUsageMinimum(long minimumMemoryUsage);
-
- long getMemoryUsageCurrent();
-
- long getMaximumMessageSize();
-
- void setMaximumMessageSize(long value);
-
-
- long getMaximumMessageCount();
-
- void setMaximumMessageCount(long value);
-
-
- long getMaximumQueueDepth();
-
- void setMaximumQueueDepth(long value);
-
-
- long getMaximumMessageAge();
-
- void setMaximumMessageAge(final long maximumMessageAge);
-
-
- long getMinimumAlertRepeatGap();
-
- void setMinimumAlertRepeatGap(long value);
-
-
- void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
-
- long clearQueue(StoreContext storeContext) throws AMQException;
-
- /**
- * Checks the status of messages on the queue, purging expired ones, firing age related alerts etc.
- * @throws AMQException
- */
- void checkMessageStatus() throws AMQException;
-
- Set<NotificationCheck> getNotificationChecks();
-
- void flushSubscription(final Subscription sub) throws AMQException;
-
- void deliverAsync(final Subscription sub);
-
- void deliverAsync();
-
- void stop();
-
- /**
- * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingExclusiveSubscription extends AMQException
- {
-
- public ExistingExclusiveSubscription()
- {
- super("");
- }
- }
-
- /**
- * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingSubscriptionPreventsExclusive extends AMQException
- {
- public ExistingSubscriptionPreventsExclusive()
- {
- super("");
- }
- }
-
- static interface Task
- {
- public void doTask(AMQQueue queue) throws AMQException;
- }
-
- void configure(QueueConfiguration config);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
deleted file mode 100644
index b77a9d8f6a..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class AMQQueueFactory
-{
- public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
- public static final AMQShortString QPID_MAX_COUNT = new AMQShortString("qpid.max_count");
- public static final AMQShortString QPID_MAX_SIZE = new AMQShortString("qpid.max_size");
- public static final AMQShortString QPID_POLICY_TYPE = new AMQShortString("qpid.policy_type");
- public static final String QPID_FLOW_TO_DISK = "flow_to_disk";
-
- public static AMQQueue createAMQQueueImpl(AMQShortString name,
- boolean durable,
- AMQShortString owner,
- boolean autoDelete,
- VirtualHost virtualHost, final FieldTable arguments)
- throws AMQException
- {
-
- int priorities = 1;
-
- if (arguments != null && arguments.containsKey(X_QPID_PRIORITIES))
- {
- Integer priority = arguments.getInteger(X_QPID_PRIORITIES);
-
- if (priority != null)
- {
- priorities = priority.intValue();
- }
- else
- {
- throw new AMQException(AMQConstant.INVALID_ARGUMENT,
- "Queue create request with non integer value for :" + X_QPID_PRIORITIES + "=" + arguments.get(X_QPID_PRIORITIES), null);
- }
-
- }
-
- AMQQueue q = null;
- if (priorities > 1)
- {
- q = new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities);
- }
- else
- {
- q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
- }
-
- final String queuePolicyType = arguments == null ? null :
- arguments.containsKey(QPID_POLICY_TYPE) ? arguments.getString(QPID_POLICY_TYPE) : null;
-
- if (queuePolicyType != null)
- {
- if (queuePolicyType.equals(QPID_FLOW_TO_DISK))
- {
- if (arguments.containsKey(QPID_MAX_SIZE))
- {
-
- final long queueSize = arguments.getInteger(QPID_MAX_SIZE);
-
- if (queueSize < 0)
- {
- throw new AMQException(AMQConstant.INVALID_ARGUMENT,
- "Queue create request with negative size:" + queueSize, null);
- }
-
- q.setMemoryUsageMaximum(queueSize);
- }
- else
- {
- throw new AMQException(AMQConstant.INVALID_ARGUMENT,
- "Queue create request with no qpid.max_size value,", null);
- }
- }
- else
- {
- throw new AMQException(AMQConstant.NOT_IMPLEMENTED,
- "Queue create request with unknown Policy Type:" + queuePolicyType, null);
- }
-
- }
-
- //Register the new queue
- virtualHost.getQueueRegistry().registerQueue(q);
- return q;
- }
-
- public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
- {
- AMQShortString queueName = new AMQShortString(config.getName());
-
- boolean durable = config.getDurable();
- boolean autodelete = config.getAutoDelete();
- AMQShortString owner = (config.getOwner() != null) ? new AMQShortString(config.getOwner()) : null;
- FieldTable arguments = null;
- boolean priority = config.getPriority();
- int priorities = config.getPriorities();
- if (priority || priorities > 0)
- {
- if (arguments == null)
- {
- arguments = new FieldTable();
- }
- if (priorities < 0)
- {
- priorities = 10;
- }
- arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
- }
-
- AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments);
- q.configure(config);
- return q;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
deleted file mode 100644
index b46d6b6f12..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ /dev/null
@@ -1,494 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
-
-import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
-import javax.management.OperationsException;
-import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.ArrayType;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-/**
- * AMQQueueMBean is the management bean for an {@link AMQQueue}.
- *
- * <p/><tablse id="crc"><caption>CRC Caption</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- */
-@MBeanDescription("Management Interface for AMQQueue")
-public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
-{
- /** Used for debugging purposes. */
- private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
-
- private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
-
- /**
- * Since the MBean is not associated with a real channel we can safely create our own store context
- * for use in the few methods that require one.
- */
- private StoreContext _storeContext = new StoreContext();
-
- private AMQQueue _queue = null;
- private String _queueName = null;
- // OpenMBean data types for viewMessages method
- private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" };
- private static String[] _msgAttributeIndex = { _msgAttributeNames[0] };
- private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
- private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
- private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
- // OpenMBean data types for viewMessageContent method
- private static CompositeType _msgContentType = null;
- private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" };
- private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
- private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
- private Notification _lastNotification = null;
-
-
-
-
- @MBeanConstructor("Creates an MBean exposing an AMQQueue")
- public AMQQueueMBean(AMQQueue queue) throws JMException
- {
- super(ManagedQueue.class, ManagedQueue.TYPE, ManagedQueue.VERSION);
- _queue = queue;
- _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
- }
-
- public ManagedObject getParentObject()
- {
- return _queue.getVirtualHost().getManagedObject();
- }
-
- static
- {
- try
- {
- init();
- }
- catch (JMException ex)
- {
- // This is not expected to ever occur.
- throw new RuntimeException("Got JMException in static initializer.", ex);
- }
- }
-
- /**
- * initialises the openmbean data types
- */
- private static void init() throws OpenDataException
- {
- _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
- _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
- _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType =
- new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes,
- _msgContentAttributeTypes);
-
- _msgAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
- _msgAttributeTypes[2] = SimpleType.LONG; // For size
- _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
-
- _messageDataType =
- new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
- _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
- }
-
- public String getObjectInstanceName()
- {
- return _queueName;
- }
-
- public String getName()
- {
- return _queueName;
- }
-
- public boolean isDurable()
- {
- return _queue.isDurable();
- }
-
- public String getOwner()
- {
- return String.valueOf(_queue.getOwner());
- }
-
- public boolean isAutoDelete()
- {
- return _queue.isAutoDelete();
- }
-
- public Integer getMessageCount()
- {
- return _queue.getMessageCount();
- }
-
- public Long getMaximumMessageSize()
- {
- return _queue.getMaximumMessageSize();
- }
-
- public Long getMaximumMessageAge()
- {
- return _queue.getMaximumMessageAge();
- }
-
- public void setMaximumMessageAge(Long maximumMessageAge)
- {
- _queue.setMaximumMessageAge(maximumMessageAge);
- }
-
- public void setMaximumMessageSize(Long value)
- {
- _queue.setMaximumMessageSize(value);
- }
-
- public Integer getConsumerCount()
- {
- return _queue.getConsumerCount();
- }
-
- public Integer getActiveConsumerCount()
- {
- return _queue.getActiveConsumerCount();
- }
-
- public Long getReceivedMessageCount()
- {
- return _queue.getReceivedMessageCount();
- }
-
- public Long getMaximumMessageCount()
- {
- return _queue.getMaximumMessageCount();
- }
-
- public void setMaximumMessageCount(Long value)
- {
- _queue.setMaximumMessageCount(value);
- }
-
- /**
- * returns the maximum total size of messages(bytes) in the queue.
- */
- public Long getMaximumQueueDepth()
- {
- return _queue.getMaximumQueueDepth();
- }
-
- public void setMaximumQueueDepth(Long value)
- {
- _queue.setMaximumQueueDepth(value);
- }
-
- public Long getMemoryUsageMaximum()
- {
- return _queue.getMemoryUsageMaximum();
- }
-
- public void setMemoryUsageMaximum(Long maximumMemoryUsage)
- {
- _queue.setMemoryUsageMaximum(maximumMemoryUsage);
- }
-
- public Long getMemoryUsageMinimum()
- {
- return _queue.getMemoryUsageMinimum();
- }
-
- public void setMemoryUsageMinimum(Long minimumMemoryUsage)
- {
- _queue.setMemoryUsageMinimum(minimumMemoryUsage);
- }
-
- public Long getMemoryUsageCurrent()
- {
- return _queue.getMemoryUsageCurrent();
- }
-
- public boolean isFlowed()
- {
- return _queue.isFlowed();
- }
-
- /**
- * returns the total size of messages(bytes) in the queue.
- */
- public Long getQueueDepth() throws JMException
- {
- return _queue.getQueueDepth();
- }
-
- /**
- * Checks if there is any notification to be send to the listeners
- * @param queueEntry
- */
- public void checkForNotification(QueueEntry queueEntry) throws AMQException
- {
-
- final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
-
- if(!notificationChecks.isEmpty())
- {
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
-
- for (NotificationCheck check : notificationChecks)
- {
- if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
- {
- if (check.notifyIfNecessary(queueEntry, _queue, this))
- {
- _lastNotificationTimes[check.ordinal()] = currentTime;
- }
- }
- }
- }
-
- }
-
- /**
- * Sends the notification to the listeners
- */
- public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
- {
- // important : add log to the log file - monitoring tools may be looking for this
- _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
- notificationMsg = notification.name() + " " + notificationMsg;
-
- _lastNotification =
- new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
- System.currentTimeMillis(), notificationMsg);
-
- _broadcaster.sendNotification(_lastNotification);
- }
-
- public Notification getLastNotification()
- {
- return _lastNotification;
- }
-
- /**
- * @see AMQQueue#deleteMessageFromTop
- */
- public void deleteMessageFromTop() throws JMException
- {
- try
- {
- _queue.deleteMessageFromTop(_storeContext);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * @see AMQQueue#clearQueue
- */
- public void clearQueue() throws JMException
- {
- try
- {
- _queue.clearQueue(_storeContext);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * returns message content as byte array and related attributes for the given message id.
- */
- public CompositeData viewMessageContent(long msgId) throws JMException
- {
- QueueEntry entry = _queue.getMessageOnTheQueue(msgId);
-
- if (entry == null)
- {
- throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
- }
-
- AMQMessage msg = entry.getMessage();
- // get message content
- Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
- List<Byte> msgContent = new ArrayList<Byte>();
- while (cBodies.hasNext())
- {
- ContentChunk body = cBodies.next();
- if (body.getSize() != 0)
- {
- if (body.getSize() != 0)
- {
- ByteBuffer slice = body.getData().slice();
- for (int j = 0; j < slice.limit(); j++)
- {
- msgContent.add(slice.get());
- }
- }
- }
- }
-
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
- {
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
- }
-
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
-
- return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
- }
-
- /**
- * Returns the header contents of the messages stored in this queue in tabular form.
- */
- public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
- {
- if ((beginIndex > endIndex) || (beginIndex < 1))
- {
- throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex
- + "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
- }
-
- List<QueueEntry> list = _queue.getMessagesOnTheQueue();
- TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
-
- // Create the tabular list of message header contents
- for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
- {
- QueueEntry queueEntry = list.get(i - 1);
- AMQMessage msg = queueEntry.getMessage();
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
- queueEntry.isRedelivered() };
- CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
- _messageList.put(messageData);
- }
-
- return _messageList;
- }
-
- private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
- {
- List<String> list = new ArrayList<String>();
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
- list.add("reply-to = " + headerProperties.getReplyToAsString());
- list.add("propertyFlags = " + headerProperties.getPropertyFlags());
- list.add("ApplicationID = " + headerProperties.getAppIdAsString());
- list.add("ClusterID = " + headerProperties.getClusterIdAsString());
- list.add("UserId = " + headerProperties.getUserIdAsString());
- list.add("JMSMessageID = " + headerProperties.getMessageIdAsString());
- list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
-
- int delMode = headerProperties.getDeliveryMode();
- list.add("JMSDeliveryMode = " +
- ((delMode == BasicContentHeaderProperties.PERSISTENT) ? "Persistent" : "Non_Persistent"));
-
- list.add("JMSPriority = " + headerProperties.getPriority());
- list.add("JMSType = " + headerProperties.getType());
-
- long longDate = headerProperties.getExpiration();
- String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSExpiration = " + strDate);
-
- longDate = headerProperties.getTimestamp();
- strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSTimestamp = " + strDate);
-
- return list.toArray(new String[list.size()]);
- }
-
- /**
- * @see ManagedQueue#moveMessages
- * @param fromMessageId
- * @param toMessageId
- * @param toQueueName
- * @throws JMException
- */
- public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
- {
- if ((fromMessageId > toMessageId) || (fromMessageId < 1))
- {
- throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
- }
-
- _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
- }
-
- /**
- * returns Notifications sent by this MBean.
- */
- @Override
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
- String name = MonitorNotification.class.getName();
- String description = "Either Message count or Queue depth or Message size has reached threshold high value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
- return new MBeanNotificationInfo[] { info1 };
- }
-
-} // End of AMQQueueMBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
deleted file mode 100644
index cbe9246f09..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class DefaultQueueRegistry implements QueueRegistry
-{
- private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
-
- private final VirtualHost _virtualHost;
-
- public DefaultQueueRegistry(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- public void registerQueue(AMQQueue queue) throws AMQException
- {
- _queueMap.put(queue.getName(), queue);
- }
-
- public void unregisterQueue(AMQShortString name) throws AMQException
- {
- _queueMap.remove(name);
- }
-
- public AMQQueue getQueue(AMQShortString name)
- {
- return _queueMap.get(name);
- }
-
- public Collection<AMQShortString> getQueueNames()
- {
- return _queueMap.keySet();
- }
-
- public Collection<AMQQueue> getQueues()
- {
- return _queueMap.values();
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java
deleted file mode 100644
index a2fcab9e73..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-public class ExchangeBinding
-{
- private final Exchange _exchange;
- private final AMQShortString _routingKey;
- private final FieldTable _arguments;
-
- private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();
-
- ExchangeBinding(AMQShortString routingKey, Exchange exchange)
- {
- this(routingKey, exchange, EMPTY_ARGUMENTS);
- }
-
- ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
- {
- _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
- _exchange = exchange;
- _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
- }
-
- void unbind(AMQQueue queue) throws AMQException
- {
- _exchange.deregisterQueue(_routingKey, queue, _arguments);
- }
-
- public Exchange getExchange()
- {
- return _exchange;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
- public FieldTable getArguments()
- {
- return _arguments;
- }
-
- public int hashCode()
- {
- return (_exchange == null ? 0 : _exchange.hashCode())
- + (_routingKey == null ? 0 : _routingKey.hashCode());
- }
-
- public boolean equals(Object o)
- {
- if (!(o instanceof ExchangeBinding))
- {
- return false;
- }
- ExchangeBinding eb = (ExchangeBinding) o;
- return _exchange.equals(eb._exchange)
- && _routingKey.equals(eb._routingKey);
- }
-} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
deleted file mode 100644
index fb839c1783..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-
-/**
- * When a queue is deleted, it should be deregistered from any
- * exchange it has been bound to. This class assists in this task,
- * by keeping track of all bindings for a given queue.
- */
-class ExchangeBindings
-{
- private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>();
- private final AMQQueue _queue;
-
- ExchangeBindings(AMQQueue queue)
- {
- _queue = queue;
- }
-
- /**
- * Adds the specified binding to those being tracked.
- * @param routingKey the routing key with which the queue whose bindings
- * are being tracked by the instance has been bound to the exchange
- * @param exchange the exchange bound to
- */
- void addBinding(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
- {
- _bindings.add(new ExchangeBinding(routingKey, exchange, arguments));
- }
-
-
- public boolean remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
- {
- return _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments));
- }
-
-
- /**
- * Deregisters this queue from any exchange it has been bound to
- */
- void deregister() throws AMQException
- {
- //remove duplicates at this point
- HashSet<ExchangeBinding> copy = new HashSet<ExchangeBinding>(_bindings);
- for (ExchangeBinding b : copy)
- {
- b.unbind(_queue);
- }
- }
-
- List<ExchangeBinding> getExchangeBindings()
- {
- return _bindings;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
deleted file mode 100644
index 6466e81dd2..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-
-/**
- * Signals that the dequeue of a message from a queue failed.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Indicates the a message could not be dequeued from a queue.
- * <tr><td>
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Happens as a consequence of a message store failure, or reference counting error. Both of which migh become
- * runtime exceptions, as unrecoverable conditions? In which case this one might be dropped too.
- */
-public class FailedDequeueException extends AMQException
-{
- public FailedDequeueException(String queue)
- {
- super("Failed to dequeue message from " + queue);
- }
-
- public FailedDequeueException(String queue, AMQException e)
- {
- super("Failed to dequeue message from " + queue, e);
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
deleted file mode 100644
index b9d07d032b..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.util.FileUtils;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-public class FileQueueBackingStore implements QueueBackingStore
-{
- private static final Logger _log = Logger.getLogger(FileQueueBackingStore.class);
-
- private String _flowToDiskLocation;
-
- public FileQueueBackingStore(String location)
- {
- _flowToDiskLocation = location;
- }
-
- public AMQMessage load(Long messageId)
- {
- _log.info("Loading Message (ID:" + messageId + ")");
-
- MessageMetaData mmd;
-
- File handle = getFileHandle(messageId);
-
- ObjectInputStream input = null;
-
- Exception error = null;
- try
- {
- input = new ObjectInputStream(new FileInputStream(handle));
-
- long arrivaltime = input.readLong();
-
- final AMQShortString exchange = new AMQShortString(input.readUTF());
- final AMQShortString routingKey = new AMQShortString(input.readUTF());
- final boolean mandatory = input.readBoolean();
- final boolean immediate = input.readBoolean();
-
- int bodySize = input.readInt();
- byte[] underlying = new byte[bodySize];
-
- input.readFully(underlying, 0, bodySize);
-
- ByteBuffer buf = ByteBuffer.wrap(underlying);
-
- ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize);
-
- int chunkCount = input.readInt();
-
- // There are WAY to many annonymous MPIs in the code this should be made concrete.
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- };
-
- mmd = new MessageMetaData(info, chb, chunkCount);
- mmd.setArrivalTime(arrivaltime);
-
- AMQMessage message;
- if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() ==
- BasicContentHeaderProperties.PERSISTENT)
- {
- message = new PersistentAMQMessage(messageId, null);
- }
- else
- {
- message = new TransientAMQMessage(messageId);
- }
-
- message.recoverFromMessageMetaData(mmd);
-
- for (int chunk = 0; chunk < chunkCount; chunk++)
- {
- int length = input.readInt();
-
- byte[] data = new byte[length];
-
- input.readFully(data, 0, length);
-
- try
- {
- message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount));
- }
- catch (AMQException e)
- {
- //ignore as this will not occur.
- // It is thrown by the _transactionLog method in load on PersistentAMQMessage
- // but we have created the message with a null log and will never call that method.
- }
- }
-
- return message;
- }
- catch (Exception e)
- {
- error = e;
- }
- finally
- {
- try
- {
- if (input != null)
- {
- input.close();
- }
- }
- catch (IOException e)
- {
- _log.info("Unable to close input on message(" + messageId + ") recovery due to:" + e.getMessage());
- }
- }
-
- throw new UnableToRecoverMessageException(error);
- }
-
- /**
- * Thread safety is ensured here by synchronizing on the message object.
- *
- * This is safe as load() calls will fail until the first thread through here has created the file on disk
- * and fully written the content.
- *
- * After this point new AMQMessages can exist that reference the same data thus breaking the synchronisation.
- *
- * Thread safety is maintained here as the existence of the file is checked allowing then subsequent unload() calls
- * to skip the writing.
- *
- * Multiple unload() calls will initially be blocked using the synchronization until the data exists on disk thus
- * safely allowing any reference to the message to be cleared prompting a load call.
- *
- * @param message the message to unload
- * @throws UnableToFlowMessageException
- */
- public void unload(AMQMessage message) throws UnableToFlowMessageException
- {
- //Synchorize on the message to ensure that one only thread can unload at a time.
- // If a second unload is attempted then it will block until the unload has completed.
- synchronized (message)
- {
- long messageId = message.getMessageId();
-
- File handle = getFileHandle(messageId);
-
- //If we have written the data once then we don't need to do it again.
- if (handle.exists())
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Message(ID:" + messageId + ") already unloaded.");
- }
- return;
- }
-
- if (_log.isInfoEnabled())
- {
- _log.info("Unloading Message (ID:" + messageId + ")");
- }
-
- ObjectOutputStream writer = null;
- Exception error = null;
-
- try
- {
- writer = new ObjectOutputStream(new FileOutputStream(handle));
-
- writer.writeLong(message.getArrivalTime());
-
- MessagePublishInfo mpi = message.getMessagePublishInfo();
- writer.writeUTF(String.valueOf(mpi.getExchange()));
- writer.writeUTF(String.valueOf(mpi.getRoutingKey()));
- writer.writeBoolean(mpi.isMandatory());
- writer.writeBoolean(mpi.isImmediate());
- ContentHeaderBody chb = message.getContentHeaderBody();
-
- // write out the content header body
- final int bodySize = chb.getSize();
- byte[] underlying = new byte[bodySize];
- ByteBuffer buf = ByteBuffer.wrap(underlying);
- chb.writePayload(buf);
-
- writer.writeInt(bodySize);
- writer.write(underlying, 0, bodySize);
-
- int bodyCount = message.getBodyCount();
- writer.writeInt(bodyCount);
-
- //WriteContentBody
- for (int index = 0; index < bodyCount; index++)
- {
- ContentChunk chunk = message.getContentChunk(index);
- int length = chunk.getSize();
-
- byte[] chunk_underlying = new byte[length];
-
- ByteBuffer chunk_buf = chunk.getData();
-
- chunk_buf.duplicate().rewind().get(chunk_underlying);
-
- writer.writeInt(length);
- writer.write(chunk_underlying, 0, length);
- }
- }
- catch (FileNotFoundException e)
- {
- error = e;
- }
- catch (IOException e)
- {
- error = e;
- }
- finally
- {
- // In a FileNotFound situation writer will be null.
- if (writer != null)
- {
- try
- {
- writer.flush();
- writer.close();
- }
- catch (IOException e)
- {
- error = e;
- }
- }
- }
-
- if (error != null)
- {
- _log.error("Unable to unload message(" + messageId + ") to disk, restoring state.");
- handle.delete();
- throw new UnableToFlowMessageException(messageId, error);
- }
- }
- }
-
- /**
- * Use the messageId to calculate the file path on disk.
- *
- * Current implementation will give us 256 bins.
- * Therefore the maximum messages that can be flowed before error/platform is:
- * ext3 : 256 bins * 32000 = 8192000
- * FAT32 : 256 bins * 65534 = 16776704
- * Other FS have much greater limits than we need to worry about.
- *
- * @param messageId the Message we need a file Handle for.
- *
- * @return the File handle
- */
- private File getFileHandle(long messageId)
- {
- // grab the 8 LSB to give us 256 bins
- long bin = messageId & 0xFFL;
-
- String bin_path = _flowToDiskLocation + File.separator + bin;
- File bin_dir = new File(bin_path);
-
- if (!bin_dir.exists())
- {
- bin_dir.mkdirs();
- }
-
- String id = bin_path + File.separator + messageId;
-
- return new File(id);
- }
-
- public void delete(Long messageId)
- {
- File handle = getFileHandle(messageId);
-
- if (handle.exists())
- {
- if (_log.isInfoEnabled())
- {
- _log.info("Message(" + messageId + ") delete flowToDisk.");
- }
- if (!handle.delete())
- {
- throw new RuntimeException("Unable to delete flowToDisk data");
- }
- }
- }
-
- public void close()
- {
- _log.info("Closing Backing store at:" + _flowToDiskLocation);
- if (!FileUtils.delete(new File(_flowToDiskLocation), true))
- {
- // Attempting a second time appears to ensure that it is deleted.
- if (!FileUtils.delete(new File(_flowToDiskLocation), true))
- {
- _log.error("Unable to fully delete backing store location");
- }
- }
- }
-
- private class RecoverDataBuffer implements ContentChunk
- {
- private int _length;
- private ByteBuffer _dataBuffer;
-
- public RecoverDataBuffer(int length, byte[] data)
- {
- _length = length;
- _dataBuffer = ByteBuffer.wrap(data);
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public ByteBuffer getData()
- {
- return _dataBuffer;
- }
-
- public void reduceToFit()
- {
-
- }
-
- }
-
-}
-
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
deleted file mode 100644
index 8981db0071..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.util.FileUtils;
-
-import java.io.File;
-
-public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory
-{
- private static final Logger _log = Logger.getLogger(FileQueueBackingStoreFactory.class);
-
- private String _flowToDiskLocation;
- public static final String QUEUE_BACKING_DIR = "queueBacking";
-
- public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException
- {
- setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation());
- }
-
- private void setFlowToDisk(String vHostName, String location) throws ConfigurationException
- {
- if (vHostName == null)
- {
- throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified");
- }
-
- if (location == null)
- {
- throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified.");
- }
-
- _flowToDiskLocation = location;
-
- _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName;
-
- //Check the location we will create QUEUE_BACKING_DIR in.
- File root = new File(location);
- if (!root.exists())
- {
- throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath());
- }
- else
- {
-
- if (root.isFile())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:" +
- root.getAbsolutePath());
- }
-
- if (!root.canWrite())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:" +
- root.getAbsolutePath());
- }
-
- }
-
- // if we don't mark QUEUE_BAKCING_DIR as a deleteOnExit it will remain.
- File backingDir = new File(location + File.separator + QUEUE_BACKING_DIR);
- if (backingDir.exists())
- {
- if (!FileUtils.delete(backingDir, true))
- {
- throw new ConfigurationException("Unable to delete existing Flow to Disk root at:"
- + backingDir.getAbsolutePath());
- }
-
- if (backingDir.isFile())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk root as specified location is a file:" +
- backingDir.getAbsolutePath());
- }
- }
-
- backingDir.deleteOnExit();
- if (!backingDir.mkdirs())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk root:" + location + File.separator + QUEUE_BACKING_DIR);
- }
-
-
- File store = new File(_flowToDiskLocation);
- if (store.exists())
- {
- if (!FileUtils.delete(store, true))
- {
- throw new ConfigurationException("Unable to delete existing Flow to Disk store at:"
- + store.getAbsolutePath());
- }
-
- if (store.isFile())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:" +
- store.getAbsolutePath());
- }
-
- }
-
- _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath());
- store.deleteOnExit();
- if (!store.mkdir())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath());
- }
- }
-
- public QueueBackingStore createBacking(AMQQueue queue)
- {
- return new FileQueueBackingStore(createStore(queue.getName().toString()));
- }
-
- private String createStore(String name)
- {
- return createStore(name, 0);
- }
-
- /**
- * Returns a hash code for non-null Object x.
- * Uses the same hash code spreader as most other java.util hash tables.
- *
- * Borrowed from the Apache Harmony project
- * @param x the object serving as a key
- * @return the hash code
- */
- public static int hash(Object x) {
- int h = x.hashCode();
- h += ~(h << 9);
- h ^= (h >>> 14);
- h += (h << 4);
- h ^= (h >>> 10);
- return h;
- }
-
- private String createStore(String name, int index)
- {
-
- int hash = hash(name);
-
- long bin = hash & 0xFFL;
-
- String store = _flowToDiskLocation + File.separator + bin + File.separator + name;
-
- if (index > 0)
- {
- store += "-" + index;
- }
-
- //TODO ensure name is safe for the OS i.e. on OSX you can't have any ':'
- // Does java take care of this?
-
- File storeFile = new File(store);
-
- if (storeFile.exists())
- {
- return createStore(name, index + 1);
- }
-
- // Ensure we report an error if we cannot create the backing store.
- if (!storeFile.mkdirs())
- {
- _log.error("Unable to create queue backing directory for queue:" + name);
- throw new RuntimeException("Unable to create queue backing directory for queue:" + name);
- }
-
- storeFile.deleteOnExit();
-
- return store;
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
deleted file mode 100644
index d38932bb61..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.AMQException;
-
-public interface Filterable<E extends Exception>
-{
- ContentHeaderBody getContentHeaderBody() throws E;
-
- boolean isPersistent() throws E;
-
- boolean isRedelivered();
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
deleted file mode 100644
index b252c7304e..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
+++ /dev/null
@@ -1,550 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-/** This is an abstract base class to handle */
-public abstract class FlowableBaseQueueEntryList implements QueueEntryList
-{
- protected static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
-
- private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
- private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
- protected final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
- /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */
-
- protected long _memoryUsageMaximum = -1L;
-
- /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
- protected long _memoryUsageMinimum = 0;
- private volatile AtomicBoolean _flowed;
- private QueueBackingStore _backingStore;
- protected AMQQueue _queue;
- private Executor _inhaler;
- private Executor _purger;
- private AtomicBoolean _stopped;
- private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null);
- protected boolean _disableFlowToDisk;
- private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
- private static final int BATCH_PROCESS_COUNT = 100;
- protected FlowableBaseQueueEntryList _parentQueue;
-
- FlowableBaseQueueEntryList(AMQQueue queue)
- {
- _queue = queue;
- _flowed = new AtomicBoolean(false);
- VirtualHost vhost = queue.getVirtualHost();
- if (vhost != null)
- {
- _backingStore = vhost.getQueueBackingStoreFactory().createBacking(queue);
- }
-
- _stopped = new AtomicBoolean(false);
- _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
- _purger = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
- _disableFlowToDisk = true;
- }
-
- public void setFlowed(boolean flowed)
- {
- if (_flowed.get() != flowed)
- {
- _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
- _flowed.set(flowed);
- }
- }
-
- protected void showUsage()
- {
- showUsage("");
- }
-
- protected void showUsage(String prefix)
- {
- if (_log.isTraceEnabled())
- {
- _log.trace(prefix + " Queue(" + _queue.getName() + ") usage:" + memoryUsed()
- + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum()
- + "/" + dataSize());
- }
- }
-
- public boolean isFlowed()
- {
- if (_parentQueue != null)
- {
- return _parentQueue.isFlowed();
- }
- else
- {
- return _flowed.get();
- }
- }
-
- public int size()
- {
- return _atomicQueueCount.get();
- }
-
- public long dataSize()
- {
- return _atomicQueueSize.get();
- }
-
- public long memoryUsed()
- {
- return _atomicQueueInMemory.get();
- }
-
- public void setMemoryUsageMaximum(long maximumMemoryUsage)
- {
- _memoryUsageMaximum = maximumMemoryUsage;
-
- if (maximumMemoryUsage >= 0)
- {
- _disableFlowToDisk = false;
- }
-
- // Don't attempt to start the inhaler/purger unless we have a minimum value specified.
- if (_memoryUsageMaximum >= 0)
- {
- setMemoryUsageMinimum(_memoryUsageMaximum / 2);
-
- // if we have now have to much memory in use we need to purge.
- if (_memoryUsageMaximum < _atomicQueueInMemory.get())
- {
- setFlowed(true);
- startPurger();
- }
- }
- else
- {
- if (_log.isInfoEnabled())
- {
- _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
- }
- _disableFlowToDisk = true;
- }
- }
-
- public long getMemoryUsageMaximum()
- {
- return _memoryUsageMaximum;
- }
-
- public void setMemoryUsageMinimum(long minimumMemoryUsage)
- {
- _memoryUsageMinimum = minimumMemoryUsage;
-
- // Don't attempt to start the inhaler unless we have a minimum value specified.
- if (_memoryUsageMinimum > 0)
- {
- checkAndStartInhaler();
- }
- }
-
- private void checkAndStartInhaler()
- {
- // If we've increased the minimum memory above what we have in memory then
- // we need to inhale more if there is more
- if (!_disableFlowToDisk && _atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0)
- {
- startInhaler();
- }
- }
-
- private void startInhaler()
- {
- MessageInhaler inhaler = new MessageInhaler();
-
- if (_asynchronousInhaler.compareAndSet(null, inhaler))
- {
- _inhaler.execute(inhaler);
- }
- }
-
- private void startPurger()
- {
- MessagePurger purger = new MessagePurger();
-
- if (_asynchronousPurger.compareAndSet(null, purger))
- {
- _purger.execute(purger);
- }
- }
-
- public long getMemoryUsageMinimum()
- {
- return _memoryUsageMinimum;
- }
-
- /**
- * Only to be called by the QueueEntry
- *
- * @param queueEntry the entry to unload
- */
- public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
- {
- if (_parentQueue != null)
- {
- _parentQueue.entryUnloadedUpdateMemory(queueEntry);
- }
- else
- {
- if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
- {
- _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
- }
-
- checkAndStartInhaler();
- }
- }
-
- /**
- * Only to be called from the QueueEntry
- *
- * @param queueEntry the entry to load
- */
- public void entryLoadedUpdateMemory(QueueEntry queueEntry)
- {
- if (_parentQueue != null)
- {
- _parentQueue.entryLoadedUpdateMemory(queueEntry);
- }
- else
- {
- if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
- {
- _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
- setFlowed(true);
- startPurger();
- }
- }
- }
-
- public void stop()
- {
- if (!_stopped.getAndSet(true))
- {
- // The SimpleAMQQueue keeps running when stopped so we should just release the services
- // rather than actively shutdown our threads.
- //Shutdown thread for inhaler.
- ReferenceCountingExecutorService.getInstance().releaseExecutorService();
- ReferenceCountingExecutorService.getInstance().releaseExecutorService();
-
- _backingStore.close();
- }
- }
-
- /**
- * Mark this queue as part of another QueueEntryList for accounting purposes.
- *
- * All Calls from the QueueEntry to the QueueEntryList need to check if there is
- * a parent QueueEntrylist upon which the action should take place.
- *
- * @param queueEntryList The parent queue that is performing accounting.
- */
- public void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList)
- {
- _parentQueue = queueEntryList;
- }
-
- protected void incrementCounters(final QueueEntryImpl queueEntry)
- {
- if (_parentQueue != null)
- {
- _parentQueue.incrementCounters(queueEntry);
- }
- else
- {
- _atomicQueueCount.incrementAndGet();
- _atomicQueueSize.addAndGet(queueEntry.getSize());
- long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
-
- if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum)
- {
- setFlowed(true);
- queueEntry.unload();
- }
- }
- }
-
- protected void dequeued(QueueEntryImpl queueEntry)
- {
- if (_parentQueue != null)
- {
- _parentQueue.dequeued(queueEntry);
- }
- else
- {
- _atomicQueueCount.decrementAndGet();
- _atomicQueueSize.addAndGet(-queueEntry.getSize());
- if (!queueEntry.isFlowed())
- {
- if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
- {
- _log.error("InMemory Count just went below 0 on dequeue.");
- }
- }
- }
- }
-
- public QueueBackingStore getBackingStore()
- {
- return _backingStore;
- }
-
- private class MessageInhaler implements Runnable
- {
- public void run()
- {
- String threadName = Thread.currentThread().getName();
- Thread.currentThread().setName("Inhaler-" + _queue.getVirtualHost().getName() + "-" + _queue.getName());
- try
- {
- inhaleList(this);
- }
- finally
- {
- Thread.currentThread().setName(threadName);
- }
- }
- }
-
- private void inhaleList(MessageInhaler messageInhaler)
- {
- if (_log.isInfoEnabled())
- {
- _log.info("Inhaler Running:" + _queue.getName());
- showUsage("Inhaler Running:" + _queue.getName());
- }
- // If in memory count is at or over max then we can't inhale
- if (_atomicQueueInMemory.get() >= _memoryUsageMaximum)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Unable to start inhaling as we are already over quota:" +
- _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
- }
- return;
- }
-
- _asynchronousInhaler.compareAndSet(messageInhaler, null);
- int inhaled = 1;
-
- //Because we may not be able to totally fill up to _memoryUsageMaximum we need to be able to say we've done
- // enough loading and this inhale process should stop
- boolean finshedInhaling = false;
-
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory
- && !finshedInhaling // Have we loaded all we can fit into memory
- && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we haven't loaded all that is available
- && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
- && (inhaled > 0) // ensure we could inhale something
- && _asynchronousInhaler.compareAndSet(null, messageInhaler)) // Ensure we are the running inhaler
- {
- inhaled = 0;
- QueueEntryIterator iterator = iterator();
-
- // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread.
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum)
- && !iterator.getNode().isAvailable() && iterator.advance())
- {
- //Find first AVAILABLE node
- }
-
- // Because the above loop checks then moves on to the next entry a check for atTail will return true but
- // we won't have checked the last entry to see if we can load it. So create atEndofList and update it based
- // on the return from advance() which returns true if it can advance.
- boolean atEndofList = false;
-
- while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we haven't filled our max memory
- && !finshedInhaling // Have we loaded all we can fit into memory
- && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
- && !atEndofList) // We have reached end of list QueueEntries
- {
- QueueEntry entry = iterator.getNode();
-
- if (entry.isAvailable() && entry.isFlowed())
- {
- if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum)
- {
- // We don't have space for this message so we need to stop inhaling.
- if (_log.isDebugEnabled())
- {
- _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity());
- }
- finshedInhaling = true;
- }
- else
- {
- entry.load();
- inhaled++;
- }
- }
-
- atEndofList = !iterator.advance();
- }
-
- if (iterator.atTail())
- {
- setFlowed(false);
- }
-
- _asynchronousInhaler.set(null);
- }
-
- if (_log.isInfoEnabled())
- {
- _log.info("Inhaler Stopping:" + _queue.getName());
- showUsage("Inhaler Stopping:" + _queue.getName());
- }
-
- //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
- if (!finshedInhaling && _flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
- {
- if (_log.isInfoEnabled())
- {
- _log.info("Rescheduling Inhaler:" + _queue.getName());
- }
- _inhaler.execute(messageInhaler);
- }
-
- }
-
- private class MessagePurger implements Runnable
- {
- public void run()
- {
- String threadName = Thread.currentThread().getName();
- Thread.currentThread().setName("Purger-" + _queue.getVirtualHost().getName() + "-" + _queue.getName());
- try
- {
- purgeList(this);
- }
- finally
- {
- Thread.currentThread().setName(threadName);
- }
- }
- }
-
- private void purgeList(MessagePurger messagePurger)
- {
- // If in memory count is at or over max then we can't inhale
- if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Unable to start purging as we are already below our minimum cache level:" +
- _atomicQueueInMemory.get() + "<=" + _memoryUsageMinimum);
- }
- return;
- }
-
- if (_log.isInfoEnabled())
- {
- _log.info("Purger Running:" + _queue.getName());
- showUsage("Purger Running:" + _queue.getName());
- }
-
- _asynchronousPurger.compareAndSet(messagePurger, null);
- int purged = 0;
-
- while ((_atomicQueueInMemory.get() > _memoryUsageMaximum)
- && purged < BATCH_PROCESS_COUNT
- && _asynchronousPurger.compareAndSet(null, messagePurger))
- {
- QueueEntryIterator iterator = iterator();
-
- //There are potentially AQUIRED messages that can be purged but we can't purge the last AQUIRED message
- // as it may have just become AQUIRED and not yet delivered.
-
- //To be safe only purge available messages. This should be fine as long as we have a small prefetch.
- while (!iterator.getNode().isAvailable() && iterator.advance())
- {
- //Find first AVAILABLE node
- }
-
- // Count up the memory usage to find our minimum point
- long memoryUsage = 0;
- boolean atTail = false;
- while ((memoryUsage < _memoryUsageMaximum) && !atTail)
- {
- QueueEntry entry = iterator.getNode();
-
- if (entry.isAvailable() && !entry.isFlowed())
- {
- memoryUsage += entry.getSize();
- // If this message is what puts us over the limit then break
- // out of this loop as we need to purge this item.
- if (memoryUsage > _memoryUsageMaximum)
- {
- break;
- }
- }
-
- atTail = !iterator.advance();
- }
-
- //Purge remainging mesages on queue
- while (!atTail && (purged < BATCH_PROCESS_COUNT))
- {
- QueueEntry entry = iterator.getNode();
-
- if (entry.isAvailable() && !entry.isFlowed())
- {
- entry.unload();
- purged++;
- }
-
- atTail = !iterator.advance();
- }
-
- _asynchronousPurger.set(null);
- }
-
- if (_log.isInfoEnabled())
- {
- _log.info("Purger Stopping:" + _queue.getName());
- showUsage("Purger Stopping:" + _queue.getName());
- }
-
- //If we are still flowed and are over the minimum value then schedule to run again.
- if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum)
- {
- if (_log.isInfoEnabled())
- {
- _log.info("Rescheduling Purger:" + _queue.getName());
- }
- _purger.execute(messagePurger);
- }
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
deleted file mode 100644
index 36ca197fa6..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.exchange.NoRouteException;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-
-public class IncomingMessage implements Filterable<RuntimeException>
-{
-
- /** Used for debugging purposes. */
- private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
-
- private static final boolean SYNCHED_CLOCKS =
- ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks();
-
- private final MessagePublishInfo _messagePublishInfo;
- private ContentHeaderBody _contentHeaderBody;
- private AMQMessage _message;
- private final TransactionalContext _txnContext;
-
- private static final boolean MSG_AUTH =
- ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
-
-
- /**
- * Keeps a track of how many bytes we have received in body frames
- */
- private long _bodyLengthReceived = 0;
-
- /**
- * This is stored during routing, to know the queues to which this message should immediately be
- * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
- * by the message handle.
- */
- private ArrayList<AMQQueue> _destinationQueues;
-
- private AMQProtocolSession _publisher;
- private TransactionLog _messageStore;
- private long _expiration;
-
- private Exchange _exchange;
- private static MessageFactory MESSAGE_FACTORY = MessageFactory.getInstance();
-
- public IncomingMessage(final MessagePublishInfo info,
- final TransactionalContext txnContext,
- final AMQProtocolSession publisher,
- TransactionLog messasgeStore)
- {
- if (publisher == null)
- {
- throw new NullPointerException("Message Publisher cannot be null");
- }
- _messagePublishInfo = info;
- _txnContext = txnContext;
- _publisher = publisher;
- _messageStore = messasgeStore;
- }
-
- public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
- {
- _contentHeaderBody = contentHeaderBody;
- _message = MESSAGE_FACTORY.createMessage(_messageStore, isPersistent());
- }
-
- public void setExpiration()
- {
- long expiration =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
- long timestamp =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
-
- if (SYNCHED_CLOCKS)
- {
- _expiration = expiration;
- }
- else
- {
- // Update TTL to be in broker time.
- if (expiration != 0L)
- {
- if (timestamp != 0L)
- {
- // todo perhaps use arrival time
- long diff = (System.currentTimeMillis() - timestamp);
-
- if ((diff > 1000L) || (diff < 1000L))
- {
- _expiration = expiration + diff;
- }
- }
- }
- }
-
- }
-
- public void routingComplete(final TransactionLog transactionLog) throws AMQException
- {
-
- if (isPersistent())
- {
- _txnContext.beginTranIfNecessary();
- // enqueuing the messages ensure that if required the destinations are recorded to a
- // persistent store
-
- if(_destinationQueues != null)
- {
- transactionLog.enqueueMessage(_txnContext.getStoreContext(), _destinationQueues, getMessageId());
- }
- }
- }
-
- public AMQMessage deliverToQueues()
- throws AMQException
- {
-
- // we get a reference to the destination queues now so that we can clear the
- // transient message data as quickly as possible
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues);
- }
-
-
- // first we allow the handle to know that the message has been fully received. This is useful if it is
- // maintaining any calculated values based on content chunks
- _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
-
-
-
- _message.setExpiration(_expiration);
- _message.setClientIdentifier(_publisher.getSessionIdentifier());
-
- // we then allow the transactional context to do something with the message content
- // now that it has all been received, before we attempt delivery
- _txnContext.messageFullyReceived(isPersistent());
-
- AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null;
-
- if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
- {
- throw new UnauthorizedAccessException("Acccess Refused", _message);
- }
-
- if ((_destinationQueues == null) || _destinationQueues.size() == 0)
- {
-
- if (isMandatory() || isImmediate())
- {
- throw new NoRouteException("No Route for message", _message);
-
- }
- else
- {
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message);
- }
- }
- else
- {
- int offset;
- final int queueCount = _destinationQueues.size();
- if(queueCount == 1)
- {
- offset = 0;
- }
- else
- {
- offset = ((int)(_message.getMessageId().longValue())) % queueCount;
- if(offset < 0)
- {
- offset = -offset;
- }
- }
- for (int i = offset; i < queueCount; i++)
- {
- // normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), _message);
- }
- for (int i = 0; i < offset; i++)
- {
- // normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), _message);
- }
- }
-
- return _message;
-
-
-
- }
-
- public void addContentBodyFrame(final ContentChunk contentChunk)
- throws AMQException
- {
-
- _bodyLengthReceived += contentChunk.getSize();
-
- _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
-
- }
-
- public boolean allContentReceived()
- {
- return (_bodyLengthReceived == getContentHeaderBody().bodySize);
- }
-
- public AMQShortString getExchange() throws AMQException
- {
- return _messagePublishInfo.getExchange();
- }
-
- public AMQShortString getRoutingKey() throws AMQException
- {
- return _messagePublishInfo.getRoutingKey();
- }
-
- public boolean isMandatory() throws AMQException
- {
- return _messagePublishInfo.isMandatory();
- }
-
-
- public boolean isImmediate() throws AMQException
- {
- return _messagePublishInfo.isImmediate();
- }
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return _contentHeaderBody;
- }
-
-
- public boolean isPersistent()
- {
- return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() ==
- BasicContentHeaderProperties.PERSISTENT;
- }
-
- public boolean isRedelivered()
- {
- return false;
- }
-
- /**
- * The message ID will not be assigned until the ContentHeaderBody has arrived.
- * @return
- */
- public Long getMessageId()
- {
- return _message.getMessageId();
- }
-
- public void setExchange(final Exchange e)
- {
- _exchange = e;
- }
-
- public void route() throws AMQException
- {
- _exchange.route(this);
- }
-
- public void enqueue(final ArrayList<AMQQueue> queues)
- {
- _destinationQueues = queues;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
deleted file mode 100644
index d91d45a446..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.io.IOException;
-
-import javax.management.JMException;
-import javax.management.MBeanOperationInfo;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.management.MBeanAttribute;
-import org.apache.qpid.server.management.MBeanOperation;
-import org.apache.qpid.server.management.MBeanOperationParameter;
-
-/**
- * The management interface exposed to allow management of a queue.
- * @author Robert J. Greig
- * @author Bhupendra Bhardwaj
- * @version 0.1
- */
-public interface ManagedQueue
-{
- static final String TYPE = "Queue";
- static final int VERSION = 2;
-
- /**
- * Returns the Name of the ManagedQueue.
- * @return the name of the managedQueue.
- * @throws IOException
- */
- @MBeanAttribute(name="Name", description = TYPE + " Name")
- String getName() throws IOException;
-
- /**
- * Total number of messages on the queue, which are yet to be delivered to the consumer(s).
- * @return number of undelivered message in the Queue.
- * @throws IOException
- */
- @MBeanAttribute(name="MessageCount", description = "Total number of undelivered messages on the queue")
- Integer getMessageCount() throws IOException;
-
- /**
- * Tells the total number of messages receieved by the queue since startup.
- * @return total number of messages received.
- * @throws IOException
- */
- @MBeanAttribute(name="ReceivedMessageCount", description="The total number of messages receieved by the queue since startup")
- Long getReceivedMessageCount() throws IOException;
-
- /**
- * Size of messages in the queue
- * @return
- * @throws IOException
- */
- @MBeanAttribute(name="QueueDepth", description="The total size(Bytes) of messages in the queue")
- Long getQueueDepth() throws IOException, JMException;
-
- /**
- * Returns the total number of active subscribers to the queue.
- * @return the number of active subscribers
- * @throws IOException
- */
- @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue")
- Integer getActiveConsumerCount() throws IOException;
-
- /**
- * Returns the total number of subscribers to the queue.
- * @return the number of subscribers.
- * @throws IOException
- */
- @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue")
- Integer getConsumerCount() throws IOException;
-
- /**
- * Tells the Owner of the ManagedQueue.
- * @return the owner's name.
- * @throws IOException
- */
- @MBeanAttribute(name="Owner", description = "Owner")
- String getOwner() throws IOException;
-
- /**
- * Tells whether this ManagedQueue is durable or not.
- * @return true if this ManagedQueue is a durable queue.
- * @throws IOException
- */
- @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable")
- boolean isDurable() throws IOException;
-
- /**
- * Tells if the ManagedQueue is set to AutoDelete.
- * @return true if the ManagedQueue is set to AutoDelete.
- * @throws IOException
- */
- @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete")
- boolean isAutoDelete() throws IOException;
-
- /**
- * Returns the maximum age of a message (expiration time) in milliseconds
- * @return the maximum age
- * @throws IOException
- */
- Long getMaximumMessageAge() throws IOException;
-
- /**
- * Sets the maximum age of a message in milliseconds
- * @param age maximum age of message.
- * @throws IOException
- */
- @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value(milliseconds) for message age")
- void setMaximumMessageAge(Long age) throws IOException;
-
- /**
- * Returns the maximum size of a message (in Bytes) allowed to be accepted by the
- * ManagedQueue. This is useful in setting notifications or taking
- * appropriate action, if the size of the message received is more than
- * the allowed size.
- * @return the maximum size of a message allowed to be aceepted by the
- * ManagedQueue.
- * @throws IOException
- */
- Long getMaximumMessageSize() throws IOException;
-
- /**
- * Sets the maximum size of the message (in Bytes) that is allowed to be
- * accepted by the Queue.
- * @param size maximum size of message.
- * @throws IOException
- */
- @MBeanAttribute(name="MaximumMessageSize", description="Threshold high value(Bytes) for a message size")
- void setMaximumMessageSize(Long size) throws IOException;
-
- /**
- * Tells the maximum number of messages that can be stored in the queue.
- * This is useful in setting the notifications or taking required
- * action is the number of message increase this limit.
- * @return maximum muber of message allowed to be stored in the queue.
- * @throws IOException
- */
- Long getMaximumMessageCount() throws IOException;
-
- /**
- * Sets the maximum number of messages allowed to be stored in the queue.
- * @param value the maximum number of messages allowed to be stored in the queue.
- * @throws IOException
- */
- @MBeanAttribute(name="MaximumMessageCount", description="Threshold high value for number of undelivered messages in the queue")
- void setMaximumMessageCount(Long value) throws IOException;
-
- /**
- * This is useful for setting notifications or taking required action if the size of messages
- * stored in the queue increases over this limit.
- * @return threshold high value for Queue Depth
- * @throws IOException
- */
- Long getMaximumQueueDepth() throws IOException;
-
- /**
- * Sets the maximum size of all the messages together, that can be stored
- * in the queue.
- * @param value
- * @throws IOException
- */
- @MBeanAttribute(name="MaximumQueueDepth", description="The threshold high value(Bytes) for Queue Depth")
- void setMaximumQueueDepth(Long value) throws IOException;
-
- /**
- * View the limit on the memory that this queue will utilise.
- *
- * Used by Flow to Disk.
- *
- * @return The maximum memory(B) that the queue will occuy.
- */
- public Long getMemoryUsageMaximum();
-
- /**
- * Place a limit on the memory that this queue will utilise.
- *
- * Used by Flow to Disk
- *
- * @param maximumMemoryUsage The new maximum memory(B) to be used by this queue
- */
- @MBeanAttribute(name="MemoryUsageMaximum", description="The maximum memory(Bytes) that the queue will occupy.")
- public void setMemoryUsageMaximum(Long maximumMemoryUsage);
-
- /**
- * View the minimum amount of memory that has been defined for this queue.
- *
- * Used by Flow to Disk
- *
- * @return The minimum amount of queue data(B) that the queue will attempt to keep in memory
- */
- public Long getMemoryUsageMinimum();
-
- /**
- * Set the minimum amount of memory that has been defined for this queue.
- *
- * Used by Flow to Disk
- *
- * @param minimumMemoryUsage The new minimum memory(B) level to be used by this queue
- */
- @MBeanAttribute(name="MemoryUsageMinimum", description="The minimum memory(Bytes) that the queue will occupy.")
- public void setMemoryUsageMinimum(Long minimumMemoryUsage);
-
- /**
- * View the amount of memory(B) that this queue is using.
- *
- * @return The current memory(B) usage of this queue.
- */
- @MBeanAttribute(name="MemoryUsageCurrent", description="The current amount of memory(Bytes) used by this queue.")
- public Long getMemoryUsageCurrent();
-
- /**
- * When a queue exceeds its MemoryUsageMaximum value then the Queue will start flowing to disk.
- *
- * This boolean is used to show that change in state.
- *
- * @return true if the Queue is currently flowing to disk
- */
- @MBeanAttribute(name="isFlowed", description="true if the queue is currently flowing to disk.")
- public boolean isFlowed();
-
-
-
- //********** Operations *****************//
-
-
- /**
- * Returns a subset of all the messages stored in the queue. The messages
- * are returned based on the given index numbers.
- * @param fromIndex
- * @param toIndex
- * @return
- * @throws IOException
- * @throws JMException
- */
- @MBeanOperation(name="viewMessages",
- description="Message headers for messages in this queue within given index range. eg. from index 1 - 100")
- TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex,
- @MBeanOperationParameter(name="to index", description="to index")int toIndex)
- throws IOException, JMException, AMQException;
-
- @MBeanOperation(name="viewMessageContent", description="The message content for given Message Id")
- CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId)
- throws IOException, JMException;
-
- /**
- * Deletes the first message from top.
- * @throws IOException
- * @throws JMException
- */
- @MBeanOperation(name="deleteMessageFromTop", description="Deletes the first message from top",
- impact= MBeanOperationInfo.ACTION)
- void deleteMessageFromTop() throws IOException, JMException;
-
- /**
- * Clears the queue by deleting all the undelivered messages from the queue.
- * @throws IOException
- * @throws JMException
- */
- @MBeanOperation(name="clearQueue",
- description="Clears the queue by deleting all the undelivered messages from the queue",
- impact= MBeanOperationInfo.ACTION)
- void clearQueue() throws IOException, JMException;
-
- /**
- * Moves the messages in given range of message Ids to given Queue. QPID-170
- * @param fromMessageId first in the range of message ids
- * @param toMessageId last in the range of message ids
- * @param toQueue where the messages are to be moved
- * @throws IOException
- * @throws JMException
- * @throws AMQException
- */
- @MBeanOperation(name="moveMessages",
- description="You can move messages to another queue from this queue ",
- impact= MBeanOperationInfo.ACTION)
- void moveMessages(@MBeanOperationParameter(name="from MessageId", description="from MessageId")long fromMessageId,
- @MBeanOperationParameter(name="to MessageId", description="to MessageId")long toMessageId,
- @MBeanOperationParameter(name= ManagedQueue.TYPE, description="to Queue Name")String toQueue)
- throws IOException, JMException, AMQException;
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
deleted file mode 100644
index 090096d3c3..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-
-/**
- * MessageCleanupException represents the failure to perform reference counting on messages correctly. This should not
- * happen, but there may be programming errors giving race conditions that cause the reference counting to go wrong.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Signals that the reference count of a message has gone below zero.
- * <tr><td> Indicates that a message store has lost a message which is still referenced.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo The race conditions leading to this error should be cleaned up, and a runtime exception used instead. If the
- * message store loses messages, then something is seriously wrong and it would be sensible to terminate the
- * broker. This may be disguising out of memory errors.
- */
-public class MessageCleanupException extends AMQException
-{
- public MessageCleanupException(long messageId, AMQException e)
- {
- super("Failed to cleanup message with id " + messageId, e);
- }
-
- public MessageCleanupException(String message)
- {
- super(message);
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
deleted file mode 100644
index 10e7dca18f..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.transactionlog.TransactionLog;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class MessageFactory
-{
- private AtomicLong _messageId;
- private static MessageFactory INSTANCE;
-
- private enum State
- {
- RECOVER,
- OPEN
- }
-
- private State _state = State.RECOVER;
-
- private MessageFactory()
- {
- _messageId = new AtomicLong(0L);
- }
-
- public void recoveryComplete()
- {
- _state = State.OPEN;
- }
-
- /**
- * Only to be used by tests as this will cause violate the principal that message IDs should not be reused.
- */
- public void reset()
- {
- _state = State.RECOVER;
- _messageId = new AtomicLong(0L);
- }
-
- /**
- * Normal message creation path
- * @param transactionLog
- * @param persistent
- * @return
- */
- public AMQMessage createMessage(TransactionLog transactionLog, boolean persistent)
- {
- if (_state != State.OPEN)
- {
- _state = State.OPEN;
- }
-
- return createNextMessage(_messageId.incrementAndGet(), transactionLog, persistent);
- }
-
- /**
- * Used for message recovery only and so only creates persistent messages.
- * @param messageId the id that this message must have
- * @param transactionLog
- * @return
- */
- public AMQMessage createMessage(Long messageId, TransactionLog transactionLog)
- {
- if (_state != State.RECOVER)
- {
- throw new RuntimeException("Unable to create message by ID when not recovering");
- }
-
- if (messageId < 0L)
- {
- throw new RuntimeException("Message IDs can only be positive. Requested:" + messageId);
- }
-
- _messageId.set((int)Math.max(messageId, _messageId.get()));
-
- return createNextMessage(messageId, transactionLog, true);
- }
-
- private AMQMessage createNextMessage(Long messageId, TransactionLog transactionLog, boolean persistent)
- {
- if (persistent)
- {
- return new PersistentAMQMessage(messageId, transactionLog);
- }
- else
- {
- return new TransientAMQMessage(messageId);
- }
- }
-
- public static MessageFactory getInstance()
- {
- if (INSTANCE == null)
- {
- INSTANCE = new MessageFactory();
- }
-
- return INSTANCE;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
deleted file mode 100644
index 6118a4c11f..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-
-/**
- * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
- * single unit.
- */
-public class MessageMetaData
-{
- private MessagePublishInfo _messagePublishInfo;
-
- private ContentHeaderBody _contentHeaderBody;
-
- private int _contentChunkCount;
-
- private long _arrivalTime;
-
- public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
- {
- this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
- }
-
- public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
- {
- _contentHeaderBody = contentHeaderBody;
- _messagePublishInfo = publishBody;
- _contentChunkCount = contentChunkCount;
- _arrivalTime = arrivalTime;
- }
-
- public int getContentChunkCount()
- {
- return _contentChunkCount;
- }
-
- public void setContentChunkCount(int contentChunkCount)
- {
- _contentChunkCount = contentChunkCount;
- }
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return _contentHeaderBody;
- }
-
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
- {
- _contentHeaderBody = contentHeaderBody;
- }
-
- public MessagePublishInfo getMessagePublishInfo()
- {
- return _messagePublishInfo;
- }
-
- public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
- {
- _messagePublishInfo = messagePublishInfo;
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
- public void setArrivalTime(long arrivalTime)
- {
- _arrivalTime = arrivalTime;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
deleted file mode 100644
index d6fd1eec89..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.RequiredDeliveryException;
-
-/**
- * NoConsumersException is a {@link RequiredDeliveryException} that represents the failure case where an immediate
- * message cannot be delivered because there are presently no consumers for the message. The AMQP status code, 313, is
- * always used to report this condition.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to deliver a message that must be delivered.
- * </table>
- */
-public class NoConsumersException extends RequiredDeliveryException
-{
- public NoConsumersException(AMQMessage message)
- {
- super("Immediate delivery is not possible.", message);
- }
-
- public AMQConstant getReplyCode()
- {
- return AMQConstant.NO_CONSUMERS;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
deleted file mode 100644
index a83d661de2..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-public enum NotificationCheck
-{
-
- MESSAGE_COUNT_ALERT
- {
- boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
- {
- int msgCount;
- final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
- {
- listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
- return true;
- }
- return false;
- }
- },
- MESSAGE_SIZE_ALERT(true)
- {
- boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
- {
- final long maximumMessageSize = queue.getMaximumMessageSize();
- if(maximumMessageSize != 0)
- {
- // Check for threshold message size
- long messageSize = (queueEntry == null) ? 0 : queueEntry.getSize();
-
- if (messageSize >= maximumMessageSize)
- {
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
- maximumMessageSize + ") breached. [Message ID=" +
- (queueEntry == null ? "null" : queueEntry.getMessageId()) + "]");
- return true;
- }
- }
- return false;
- }
-
- },
- QUEUE_DEPTH_ALERT
- {
- boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
- {
- // Check for threshold queue depth in bytes
- final long maximumQueueDepth = queue.getMaximumQueueDepth();
-
- if(maximumQueueDepth != 0)
- {
- final long queueDepth = queue.getQueueDepth();
-
- if (queueDepth >= maximumQueueDepth)
- {
- listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
- return true;
- }
- }
- return false;
- }
-
- },
- MESSAGE_AGE_ALERT
- {
- boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
- {
-
- final long maxMessageAge = queue.getMaximumMessageAge();
- if(maxMessageAge != 0)
- {
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - maxMessageAge;
- final long firstArrivalTime = queue.getOldestMessageArrivalTime();
-
- if(firstArrivalTime < thresholdTime)
- {
- long oldestAge = currentTime - firstArrivalTime;
- listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
-
- return true;
- }
- }
- return false;
-
- }
-
- }
- ;
-
- private final boolean _messageSpecific;
-
- NotificationCheck()
- {
- this(false);
- }
-
- NotificationCheck(boolean messageSpecific)
- {
- _messageSpecific = messageSpecific;
- }
-
- public boolean isMessageSpecific()
- {
- return _messageSpecific;
- }
-
- abstract boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener);
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
deleted file mode 100644
index 9c644cc010..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.transactionlog.TransactionLog;
-
-public class PersistentAMQMessage extends TransientAMQMessage
-{
- protected TransactionLog _transactionLog;
-
- public PersistentAMQMessage(Long messageId, TransactionLog transactionLog)
- {
- super(messageId);
- _transactionLog = transactionLog;
- }
-
- @Override
- public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
- throws AMQException
- {
- super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody);
- _transactionLog.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
- contentChunk, isLastContentBody);
- }
-
- @Override
- public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
- ContentHeaderBody contentHeaderBody)
- throws AMQException
- {
- super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody);
- MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody,
- _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
-
- _transactionLog.storeMessageMetaData(storeContext, _messageId, mmd);
- }
-
- @Override
- public boolean isPersistent()
- {
- return true;
- }
-
- @Override
- public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
- {
- super.addContentBodyFrame(null, contentChunk, isLastContentBody);
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
deleted file mode 100644
index 83c7ebb4f2..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-
-public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
-{
- private final AMQQueue _queue;
- private final QueueEntryList[] _priorityLists;
- private final int _priorities;
- private final int _priorityOffset;
-
- public PriorityQueueEntryList(AMQQueue queue, int priorities)
- {
- super(queue);
- _queue = queue;
- _priorityLists = new QueueEntryList[priorities];
- _priorities = priorities;
- _priorityOffset = 5 - ((priorities + 1) / 2);
- for (int i = 0; i < priorities; i++)
- {
- _priorityLists[i] = new SimpleQueueEntryList(queue);
- _priorityLists[i].setParentQueueEntryList(this);
- }
-
- showUsage("Created:" + _queue.getName());
- }
-
- public int getPriorities()
- {
- return _priorities;
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public QueueEntry add(AMQMessage message)
- {
- int index = ((CommonContentHeaderProperties) ((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
- if (index >= _priorities)
- {
- index = _priorities - 1;
- }
- else if (index < 0)
- {
- index = 0;
- }
-
- return _priorityLists[index].add(message);
- }
-
-
- public QueueEntry next(QueueEntry node)
- {
- QueueEntryImpl nodeImpl = (QueueEntryImpl) node;
- QueueEntry next = nodeImpl.getNext();
-
- if (next == null)
- {
- QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
- int index;
- for (index = _priorityLists.length - 1; _priorityLists[index] != nodeEntryList; index--)
- {
- ;
- }
-
- while (next == null && index != 0)
- {
- index--;
- next = ((QueueEntryImpl) _priorityLists[index].getHead()).getNext();
- }
-
- }
- return next;
- }
-
- private final class PriorityQueueEntryListIterator implements QueueEntryIterator
- {
- private final QueueEntryIterator[] _iterators = new QueueEntryIterator[_priorityLists.length];
- private QueueEntry _lastNode;
-
- PriorityQueueEntryListIterator()
- {
- for (int i = 0; i < _priorityLists.length; i++)
- {
- _iterators[i] = _priorityLists[i].iterator();
- }
- _lastNode = _iterators[_iterators.length - 1].getNode();
- }
-
- public boolean atTail()
- {
- for (int i = 0; i < _iterators.length; i++)
- {
- if (!_iterators[i].atTail())
- {
- return false;
- }
- }
- return true;
- }
-
- public QueueEntry getNode()
- {
- return _lastNode;
- }
-
- public boolean advance()
- {
- for (int i = _iterators.length - 1; i >= 0; i--)
- {
- if (_iterators[i].advance())
- {
- _lastNode = _iterators[i].getNode();
- return true;
- }
- }
- return false;
- }
- }
-
- public QueueEntryIterator iterator()
- {
- return new PriorityQueueEntryListIterator();
- }
-
- public QueueEntry getHead()
- {
- return _priorityLists[_priorities - 1].getHead();
- }
-
- static class Factory implements QueueEntryListFactory
- {
- private final int _priorities;
-
- Factory(int priorities)
- {
- _priorities = priorities;
- }
-
- public QueueEntryList createQueueEntryList(AMQQueue queue)
- {
- return new PriorityQueueEntryList(queue, _priorities);
- }
- }
-
-
- @Override
- public void stop()
- {
- super.stop();
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- queueEntryList.stop();
- }
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
deleted file mode 100644
index 5c65cb6424..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.commons.configuration.ConfigurationException;
-
-public interface QueueBackingStore
-{
- /**
- * Retrieve the message with a given ID
- *
- * This method must be thread safe.
- *
- * Multiple calls to load with a given messageId DO NOT need to return the same object.
- *
- * @param messageId the id of the message to retreive.
- * @return
- */
- AMQMessage load(Long messageId);
-
- /**
- * Store a message in the BackingStore.
- *
- * This method must be thread safe understanding that multiple message objects may be the same data.
- *
- * Allowing a thread to return from this method means that it is safe to call load()
- *
- * Implementer guide:
- * Until the message has been loaded the message references will all be the same object.
- *
- * One appraoch as taken by the @see FileQueueBackingStore is to block aimulataneous calls to this method
- * until the message is fully on disk. This can be done by synchronising on message as initially it is always the
- * same object. Only after a load has taken place will there be a discrepency.
- *
- *
- * @param message the message to unload
- * @throws UnableToFlowMessageException
- */
- void unload(AMQMessage message) throws UnableToFlowMessageException;
-
- void delete(Long messageId);
-
- void close();
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java
deleted file mode 100644
index 3dd23a2f40..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.commons.configuration.ConfigurationException;
-
-public interface QueueBackingStoreFactory
-{
- void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException;
-
- public QueueBackingStore createBacking(AMQQueue queue);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
deleted file mode 100644
index fb23edb3c5..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ /dev/null
@@ -1,233 +0,0 @@
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.subscription.Subscription;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQException>
-{
- public static enum State
- {
- AVAILABLE,
- ACQUIRED,
- EXPIRED,
- DEQUEUED,
- DELETED
- }
-
- public static interface StateChangeListener
- {
- public void stateChanged(QueueEntry entry, State oldSate, State newState);
- }
-
- public abstract class EntryState
- {
- private EntryState()
- {
- }
-
- public abstract State getState();
- }
-
- public final class AvailableState extends EntryState
- {
-
- public State getState()
- {
- return State.AVAILABLE;
- }
- }
-
- public final class DequeuedState extends EntryState
- {
-
- public State getState()
- {
- return State.DEQUEUED;
- }
- }
-
- public final class DeletedState extends EntryState
- {
-
- public State getState()
- {
- return State.DELETED;
- }
- }
-
- public final class ExpiredState extends EntryState
- {
-
- public State getState()
- {
- return State.EXPIRED;
- }
- }
-
- public final class NonSubscriptionAcquiredState extends EntryState
- {
- public State getState()
- {
- return State.ACQUIRED;
- }
- }
-
- public final class SubscriptionAcquiredState extends EntryState
- {
- private final Subscription _subscription;
-
- public SubscriptionAcquiredState(Subscription subscription)
- {
- _subscription = subscription;
- }
-
- public State getState()
- {
- return State.ACQUIRED;
- }
-
- public Subscription getSubscription()
- {
- return _subscription;
- }
- }
-
- final static EntryState AVAILABLE_STATE = new AvailableState();
- final static EntryState DELETED_STATE = new DeletedState();
- final static EntryState DEQUEUED_STATE = new DequeuedState();
- final static EntryState EXPIRED_STATE = new ExpiredState();
- final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
- /** Flag to indicate that this message requires 'immediate' delivery. */
-
- final static byte IMMEDIATE = 0x01;
-
- /**
- * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
- * for messages published with the 'immediate' flag.
- */
-
- final static byte DELIVERED_TO_CONSUMER = 0x02;
-
-
- AMQQueue getQueue();
-
- AMQMessage getMessage();
-
- Long getMessageId();
-
- long getSize();
-
- /**
- * Called selectors to determin if the message has already been sent
- *
- * @return _deliveredToConsumer
- */
- boolean getDeliveredToConsumer();
-
- /**
- * Checks to see if the message has expired. If it has the message is dequeued.
- *
- * @return true if the message has expire
- *
- * @throws org.apache.qpid.AMQException
- */
- boolean expired() throws AMQException;
-
- public void setExpiration(final long expiration);
-
- boolean isAcquired();
-
- boolean isAvailable();
-
- boolean acquire();
-
- boolean acquire(Subscription sub);
-
- boolean delete();
-
- boolean isDeleted();
-
- boolean acquiredBySubscription();
-
- /**
- * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
- * And for selector efficiency.
- *
- * This is now also used to unload the message if this entry is on a flowed queue. As a result this method should
- * only be called after the message has been sent.
- */
- void setDeliveredToSubscription();
-
- void release();
-
- String debugIdentity();
-
- /**
- * Called to enforce the 'immediate' flag.
- *
- * @returns true if the message is marked for immediate delivery but has not been marked as delivered
- * to a consumer
- */
- boolean immediateAndNotDelivered();
-
- void setRedelivered(boolean b);
-
- Subscription getDeliveredSubscription();
-
- void reject();
-
- void reject(Subscription subscription);
-
- boolean isRejectedBy(Subscription subscription);
-
- void requeue(StoreContext storeContext) throws AMQException;
-
- void dequeue(final StoreContext storeContext) throws FailedDequeueException;
-
- /**
- * Message has been ack so dequeueAndDelete it.
- * If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
- * from the transaciton log
- *
- * @param storeContext the transactional Context in which to perform the deletion
- *
- * @throws FailedDequeueException
- * @throws MessageCleanupException
- */
- void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException;
-
- boolean isQueueDeleted();
-
- void addStateChangeListener(StateChangeListener listener);
-
- boolean removeStateChangeListener(StateChangeListener listener);
-
- void unload();
-
- AMQMessage load();
-
- boolean isFlowed();
-
-} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
deleted file mode 100644
index e6223ef4ac..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.subscription.Subscription;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-public class QueueEntryImpl implements QueueEntry
-{
-
- /** Used for debugging purposes. */
- private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
-
- private final SimpleQueueEntryList _queueEntryList;
-
- private AtomicReference<AMQMessage> _messageRef;
-
- private boolean _redelivered;
-
- private Set<Subscription> _rejectedBy = null;
-
- private volatile EntryState _state = AVAILABLE_STATE;
-
- private static final
- AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
- _stateUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, EntryState.class, "_state");
-
- private volatile Set<StateChangeListener> _stateChangeListeners;
-
- private static final
- AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
- _listenersUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
-
- private static final
- AtomicLongFieldUpdater<QueueEntryImpl>
- _entryIdUpdater =
- AtomicLongFieldUpdater.newUpdater
- (QueueEntryImpl.class, "_entryId");
-
- private volatile long _entryId;
-
- volatile QueueEntryImpl _next;
-
- private long _messageSize;
- private QueueBackingStore _backingStore;
- private AtomicBoolean _flowed;
- private Long _messageId;
-
- private byte _flags = 0;
-
- private long _expiration;
-
- private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
- private boolean _persistent;
- private boolean _hasBeenUnloaded = false;
-
- QueueEntryImpl(SimpleQueueEntryList queueEntryList)
- {
- this(queueEntryList, null, Long.MIN_VALUE);
- _state = DELETED_STATE;
- }
-
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
- {
- this(queueEntryList, message);
-
- _entryIdUpdater.set(this, entryId);
- }
-
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
- {
- _queueEntryList = queueEntryList;
- _messageRef = new AtomicReference<AMQMessage>(message);
- if (message != null)
- {
- _messageId = message.getMessageId();
- _messageSize = message.getSize();
-
- if (message.isImmediate())
- {
- _flags |= IMMEDIATE;
- }
- _expiration = message.getExpiration();
- _persistent = message.isPersistent();
- }
- _backingStore = queueEntryList.getBackingStore();
- _flowed = new AtomicBoolean(false);
- }
-
- protected void setEntryId(long entryId)
- {
- _entryIdUpdater.set(this, entryId);
- }
-
- protected long getEntryId()
- {
- return _entryId;
- }
-
- public AMQQueue getQueue()
- {
- return _queueEntryList.getQueue();
- }
-
- public AMQMessage getMessage()
- {
- return load();
- }
-
- public Long getMessageId()
- {
- return _messageId;
- }
-
- public long getSize()
- {
- return _messageSize;
- }
-
- public boolean getDeliveredToConsumer()
- {
- return (_flags & DELIVERED_TO_CONSUMER) != 0;
- }
-
- /**
- * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
- * And for selector efficiency.
- *
- * This is now also used to unload the message if this entry is on a flowed queue. As a result this method should
- * only be called after the message has been sent.
- */
- public void setDeliveredToSubscription()
- {
- _flags |= DELIVERED_TO_CONSUMER;
-
- // We have delivered this message so we can unload it if we are flowed.
- if (_queueEntryList.isFlowed())
- {
- unload();
- }
- }
-
- public boolean expired() throws AMQException
- {
- if (_expiration != 0L)
- {
- long now = System.currentTimeMillis();
-
- return (now > _expiration);
- }
-
- return false;
- }
-
- public void setExpiration(final long expiration)
- {
- _expiration = expiration;
- }
-
- public boolean isAcquired()
- {
- return _state.getState() == State.ACQUIRED;
- }
-
- public boolean isAvailable()
- {
- return _state.getState() == State.AVAILABLE;
- }
-
- public boolean acquire()
- {
- return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
- }
-
- private boolean acquire(final EntryState state)
- {
- boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
- if (acquired && _stateChangeListeners != null)
- {
- notifyStateChange(State.AVAILABLE, State.ACQUIRED);
- }
-
- return acquired;
- }
-
- public boolean acquire(Subscription sub)
- {
- return acquire(sub.getOwningState());
- }
-
- public boolean acquiredBySubscription()
- {
-
- return (_state instanceof SubscriptionAcquiredState);
- }
-
- public void release()
- {
- _stateUpdater.set(this, AVAILABLE_STATE);
- }
-
- public String debugIdentity()
- {
- String entry = "[State:" + _state.getState().name() + "]";
-
- AMQMessage message = _messageRef.get();
-
- if (message == null)
- {
- return entry + "(Message Unloaded ID:" + _messageId + ")";
- }
- else
- {
-
- return entry + message.debugIdentity();
- }
- }
-
- public boolean immediateAndNotDelivered()
- {
- return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
- }
-
- public ContentHeaderBody getContentHeaderBody() throws AMQException
- {
- return getMessage().getContentHeaderBody();
- }
-
- public boolean isPersistent() throws AMQException
- {
- return _persistent;
- }
-
- public boolean isRedelivered()
- {
- return _redelivered;
- }
-
- public void setRedelivered(boolean redelivered)
- {
- _redelivered = redelivered;
- // todo - here we could record this message as redelivered on this queue in the transactionLog
- // so we don't have to mark all messages on recover as redelivered.
- }
-
- public Subscription getDeliveredSubscription()
- {
- EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
- {
- return ((SubscriptionAcquiredState) state).getSubscription();
- }
- else
- {
- return null;
- }
-
- }
-
- public void reject()
- {
- reject(getDeliveredSubscription());
- }
-
- public void reject(Subscription subscription)
- {
- if (subscription != null)
- {
- if (_rejectedBy == null)
- {
- _rejectedBy = new HashSet<Subscription>();
- }
-
- _rejectedBy.add(subscription);
- }
- else
- {
- _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
- }
- }
-
- public boolean isRejectedBy(Subscription subscription)
- {
-
- if (_rejectedBy != null) // We have subscriptions that rejected this message
- {
- return _rejectedBy.contains(subscription);
- }
- else // This messasge hasn't been rejected yet.
- {
- return false;
- }
- }
-
- public void requeue(final StoreContext storeContext) throws AMQException
- {
- getQueue().requeue(storeContext, this);
- if (_stateChangeListeners != null)
- {
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
- }
- }
-
- public void dequeue(final StoreContext storeContext) throws FailedDequeueException
- {
- EntryState state = _state;
-
- if ((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
- {
- if (state instanceof SubscriptionAcquiredState)
- {
- Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
- s.restoreCredit(this);
- }
-
- _queueEntryList.dequeued(this);
-
- getQueue().dequeue(storeContext, this);
-
- if (_stateChangeListeners != null)
- {
- notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED);
- }
- }
- }
-
- private void notifyStateChange(final State oldState, final State newState)
- {
- for (StateChangeListener l : _stateChangeListeners)
- {
- l.stateChanged(this, oldState, newState);
- }
- }
-
- public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
- {
- //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d;
- if (getQueue() != null)
- {
- dequeue(storeContext);
- }
-
- delete();
- }
-
- public boolean isQueueDeleted()
- {
- return getQueue().isDeleted();
- }
-
- public void addStateChangeListener(StateChangeListener listener)
- {
- Set<StateChangeListener> listeners = _stateChangeListeners;
- if (listeners == null)
- {
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
- listeners = _stateChangeListeners;
- }
-
- listeners.add(listener);
- }
-
- public boolean removeStateChangeListener(StateChangeListener listener)
- {
- Set<StateChangeListener> listeners = _stateChangeListeners;
- if (listeners != null)
- {
- return listeners.remove(listener);
- }
-
- return false;
- }
-
- public void unload()
- {
- //Get the currently loaded message
- AMQMessage message = _messageRef.get();
-
- // If we have a message in memory and we have a valid backingStore attempt to unload
- if (message != null && _backingStore != null)
- {
- try
- {
- // The backingStore will now handle concurrent calls to unload and safely synchronize to ensure
- // multiple initial unloads are unloads
- _backingStore.unload(message);
- _hasBeenUnloaded = true;
- _messageRef.set(null);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Unloaded:" + debugIdentity());
- }
-
-
- // Clear the message reference if the loaded message is still the one we are processing.
-
- //Update the memoryState if this load call resulted in the message being purged from memory
- if (!_flowed.getAndSet(true))
- {
- _queueEntryList.entryUnloadedUpdateMemory(this);
- }
-
- }
- catch (UnableToFlowMessageException utfme)
- {
- // There is no recovery needed as the memory states remain unchanged.
- if (_log.isDebugEnabled())
- {
- _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage());
- }
- }
- }
- }
-
- public AMQMessage load()
- {
- // MessageId and Backing store are null in test scenarios, normally this is not the case.
- if (_messageId != null && _backingStore != null)
- {
- // See if we have the message currently in memory to return
- AMQMessage message = _messageRef.get();
- // if we don't then we need to start a load process.
- if (message == null)
- {
- //Synchronize here to ensure only the first thread that attempts to load will perform the load from the
- // backing store.
- synchronized (this)
- {
- // Check again to see if someone else ahead of us loaded the message
- message = _messageRef.get();
- // if we still don't have the message then we need to start a load process.
- if (message == null)
- {
- // Load the message and keep a reference to it
- message = _backingStore.load(_messageId);
- // Set the message reference
- _messageRef.set(message);
- }
- else
- {
- // If someone else loaded the message then we can jump out here as the Memory Updates will
- // have been performed by the loading thread
- return message;
- }
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Loaded:" + debugIdentity());
- }
-
- //Update the memoryState if this load call resulted in the message comming in to memory
- if (_flowed.getAndSet(false))
- {
- _queueEntryList.entryLoadedUpdateMemory(this);
- }
- }
-
- // Return the message that was either already in memory or the value we just loaded.
- return message;
- }
- // This can be null but only in the case where we have no messageId
- // in the case where we have no backingStore then we will never have unloaded the message
- return _messageRef.get();
- }
-
- public boolean isFlowed()
- {
- return _flowed.get();
- }
-
- public int compareTo(final QueueEntry o)
- {
- QueueEntryImpl other = (QueueEntryImpl) o;
- return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
- }
-
- public QueueEntryImpl getNext()
- {
-
- QueueEntryImpl next = nextNode();
- while (next != null && next.isDeleted())
- {
-
- final QueueEntryImpl newNext = next.nextNode();
- if (newNext != null)
- {
- SimpleQueueEntryList._nextUpdater.compareAndSet(this, next, newNext);
- next = nextNode();
- }
- else
- {
- next = null;
- }
-
- }
- return next;
- }
-
- QueueEntryImpl nextNode()
- {
- return _next;
- }
-
- public boolean isDeleted()
- {
- return _state == DELETED_STATE;
- }
-
- public boolean delete()
- {
- EntryState state = _state;
-
- if (state != DELETED_STATE && _stateUpdater.compareAndSet(this, state, DELETED_STATE))
- {
- _queueEntryList.advanceHead();
- if (_backingStore != null && _hasBeenUnloaded)
- {
- _backingStore.delete(_messageId);
- }
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public QueueEntryList getQueueEntryList()
- {
- return _queueEntryList;
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
deleted file mode 100644
index c5c115a2d1..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.queue;
-
-public interface QueueEntryIterator
-{
- boolean atTail();
-
- QueueEntry getNode();
-
- boolean advance();
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
deleted file mode 100644
index 2bbdf610de..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-public interface QueueEntryList
-{
- AMQQueue getQueue();
-
- QueueEntry add(AMQMessage message);
-
- QueueEntry next(QueueEntry node);
-
- QueueEntryIterator iterator();
-
- QueueEntry getHead();
-
- void setFlowed(boolean flowed);
-
- boolean isFlowed();
-
- int size();
-
- long dataSize();
-
- long memoryUsed();
-
- void setMemoryUsageMaximum(long maximumMemoryUsage);
-
- long getMemoryUsageMaximum();
-
- void setMemoryUsageMinimum(long minimumMemoryUsage);
-
- long getMemoryUsageMinimum();
-
- /**
- * Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler.
- * @param queueEntry the entry that has been unloaded
- */
- void entryUnloadedUpdateMemory(QueueEntry queueEntry);
-
- /**
- * Immediately update memory usage based on the load of this queueEntry
- * @param queueEntry the entry that has been loaded
- */
- void entryLoadedUpdateMemory(QueueEntry queueEntry);
-
- void stop();
-
- /**
- * Mark this queue as part of another QueueEntryList for accounting purposes.
- *
- * All Calls from the QueueEntry to the QueueEntryList need to check if there is
- * a parent QueueEntrylist upon which the action should take place.
- *
- * @param queueEntryList The parent queue that is performing accounting.
- */
- void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
deleted file mode 100644
index 4dbce45f67..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.queue;
-
-interface QueueEntryListFactory
-{
- public QueueEntryList createQueueEntryList(AMQQueue queue);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
deleted file mode 100644
index 959ca03c80..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-
-public interface QueueNotificationListener
-{
- void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
deleted file mode 100644
index 1210f0e97c..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collection;
-
-public interface QueueRegistry
-{
- VirtualHost getVirtualHost();
-
- void registerQueue(AMQQueue queue) throws AMQException;
-
- void unregisterQueue(AMQShortString name) throws AMQException;
-
- AMQQueue getQueue(AMQShortString name);
-
- Collection<AMQShortString> getQueueNames();
-
- Collection<AMQQueue> getQueues();
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
deleted file mode 100644
index 6a19acddd7..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ /dev/null
@@ -1,1666 +0,0 @@
-package org.apache.qpid.server.queue;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.management.JMException;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.pool.ReadWriteRunnable;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
-import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
-{
- private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
-
- private final AMQShortString _name;
-
- /** null means shared */
- private final AMQShortString _owner;
-
- private final boolean _durable;
-
- /** If true, this queue is deleted when the last subscriber is removed */
- private final boolean _autoDelete;
-
- private final VirtualHost _virtualHost;
-
- /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
- private final ExchangeBindings _bindings = new ExchangeBindings(this);
-
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
- private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
-
- private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
-
- protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
- private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
-
- private volatile Subscription _exclusiveSubscriber;
-
- protected final QueueEntryList _entries;
-
- private final AMQQueueMBean _managedObject;
- private final Executor _asyncDelivery;
- private final AtomicLong _totalMessagesReceived = new AtomicLong();
-
- /** max allowed size(KB) of a single message */
- public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
-
- /** max allowed number of messages on a queue. */
- public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
-
- /** max queue depth for the queue */
- public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
-
- /** maximum message age before alerts occur */
- public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
-
- /** the minimum interval between sending out consecutive alerts of the same type */
- public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
-
-
- private static final int MAX_ASYNC_DELIVERIES = 10;
-
- private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
-
- private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
- private AtomicReference _asynchronousRunner = new AtomicReference(null);
- private AtomicInteger _deliveredMessages = new AtomicInteger();
- private AtomicBoolean _stopped = new AtomicBoolean(false);
-
- protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
- }
-
- protected SimpleAMQQueue(AMQShortString name,
- boolean durable,
- AMQShortString owner,
- boolean autoDelete,
- VirtualHost virtualHost,
- QueueEntryListFactory entryListFactory)
- throws AMQException
- {
-
- if (name == null)
- {
- throw new IllegalArgumentException("Queue name must not be null");
- }
-
- if (virtualHost == null)
- {
- throw new IllegalArgumentException("Virtual Host must not be null");
- }
-
- _name = name;
- _durable = durable;
- _owner = owner;
- _autoDelete = autoDelete;
- _virtualHost = virtualHost;
- _entries = entryListFactory.createQueueEntryList(this);
-
- _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
-
- try
- {
- _managedObject = new AMQQueueMBean(this);
- _managedObject.register();
- }
- catch (JMException e)
- {
- throw new AMQException("AMQQueue MBean creation has failed ", e);
- }
-
- resetNotifications();
- }
-
- public void resetNotifications()
- {
- // This ensure that the notification checks for the configured alerts are created.
- setMaximumMessageAge(_maximumMessageAge);
- setMaximumMessageCount(_maximumMessageCount);
- setMaximumMessageSize(_maximumMessageSize);
- setMaximumQueueDepth(_maximumQueueDepth);
- }
-
- // ------ Getters and Setters
-
- public AMQShortString getName()
- {
- return _name;
- }
-
- public boolean isDurable()
- {
- return _durable;
- }
-
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
-
- public boolean isFlowed()
- {
- return _entries.isFlowed();
- }
-
- public AMQShortString getOwner()
- {
- return _owner;
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- // ------ bind and unbind
-
- public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
- {
- exchange.registerQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
- _virtualHost.getRoutingTable().bindQueue(exchange, routingKey, this, arguments);
- }
-
- _bindings.addBinding(routingKey, arguments, exchange);
- }
-
- public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
- {
- exchange.deregisterQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
- _virtualHost.getRoutingTable().unbindQueue(exchange, routingKey, this, arguments);
- }
-
- boolean removed = _bindings.remove(routingKey, arguments, exchange);
- if (!removed)
- {
- _logger.error("Mismatch between queue bindings and exchange record of bindings");
- }
- }
-
- public List<ExchangeBinding> getExchangeBindings()
- {
- return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings());
- }
-
- // ------ Manage Subscriptions
-
- public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
- {
-
- if (isExclusiveSubscriber())
- {
- throw new ExistingExclusiveSubscription();
- }
-
- if (exclusive)
- {
- if (getConsumerCount() != 0)
- {
- throw new ExistingSubscriptionPreventsExclusive();
- }
- else
- {
- _exclusiveSubscriber = subscription;
-
- }
- }
-
- _activeSubscriberCount.incrementAndGet();
- subscription.setStateListener(this);
- subscription.setLastSeenEntry(null, _entries.getHead());
-
- if (!isDeleted())
- {
- subscription.setQueue(this);
- _subscriptionList.add(subscription);
- if (isDeleted())
- {
- subscription.queueDeleted(this);
- }
- }
- else
- {
- // TODO
- }
-
- deliverAsync(subscription);
-
- }
-
- public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
- {
- if (subscription == null)
- {
- throw new NullPointerException("subscription argument is null");
- }
-
- boolean removed = _subscriptionList.remove(subscription);
-
- if (removed)
- {
- subscription.close();
- // No longer can the queue have an exclusive consumer
- setExclusiveSubscriber(null);
-
- QueueEntry lastSeen;
-
- while ((lastSeen = subscription.getLastSeenEntry()) != null)
- {
- subscription.setLastSeenEntry(lastSeen, null);
- }
-
- // auto-delete queues must be deleted if there are no remaining subscribers
-
- if (_autoDelete && getConsumerCount() == 0)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Auto-deleteing queue:" + this);
- }
-
- delete();
-
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted(this);
- }
- }
-
- }
-
- // ------ Enqueue / Dequeue
-
- public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
- {
- _totalMessagesReceived.incrementAndGet();
-
- QueueEntry entry;
- Subscription exclusiveSub = _exclusiveSubscriber;
-
- if (exclusiveSub != null)
- {
- exclusiveSub.getSendLock();
-
- try
- {
- entry = _entries.add(message);
-
- deliverToSubscription(exclusiveSub, entry);
-
- // where there is more than one producer there's a reasonable chance that even though there is
- // no "queueing" we do not deliver because we get an interleving of _entries.add and
- // deliverToSubscription between threads. Therefore have one more try.
- if (!(entry.isAcquired() || entry.isDeleted()))
- {
- deliverToSubscription(exclusiveSub, entry);
- }
- }
- finally
- {
- exclusiveSub.releaseSendLock();
- }
- }
- else
- {
- entry = _entries.add(message);
- /*
-
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
-
- */
- SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
- SubscriptionList.SubscriptionNode nextNode = node.getNext();
- if (nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
- while (nextNode != null)
- {
- if (_lastSubscriptionNode.compareAndSet(node, nextNode))
- {
- break;
- }
- else
- {
- node = _lastSubscriptionNode.get();
- nextNode = node.getNext();
- if (nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
- }
- }
-
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
-
- while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
- {
- if (nextNode == null)
- {
- loops--;
- nextNode = _subscriptionList.getHead();
- }
- else
- {
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
- }
- nextNode = nextNode.getNext();
-
- }
- }
-
- if (entry.immediateAndNotDelivered())
- {
- //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content
- // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks
- // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses.
- entry.acquire();
- entry.dequeueAndDelete(storeContext);
- }
- else if (!(entry.isAcquired() || entry.isDeleted()))
- {
- checkSubscriptionsNotAheadOfDelivery(entry);
-
- deliverAsync();
- }
-
- _managedObject.checkForNotification(entry);
-
- return entry;
- }
-
- private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
- throws AMQException
- {
-
- sub.getSendLock();
- try
- {
- if (subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended())
- {
- if (!sub.wouldSuspend(entry))
- {
- if (!sub.isBrowser() && !entry.acquire(sub))
- {
- // restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
- sub.restoreCredit(entry);
- }
- else
- {
-
- deliverMessage(sub, entry);
-
- }
- }
- }
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
-
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
- {
- // This method is only required for queues which mess with ordering
- // Simple Queues don't :-)
- }
-
- private void deliverMessage(final Subscription sub, final QueueEntry entry)
- throws AMQException
- {
- _deliveredMessages.incrementAndGet();
-
- sub.send(entry);
-
- }
-
- private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
- {
-
- // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
- // interest in.
- QueueEntry node = sub.getLastSeenEntry();
- while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
- {
-
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- else
- {
- node = null;
- break;
- }
-
- }
-
- if (node == entry)
- {
- // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
- // good
- return true;
- }
- else
- {
- // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing
- // no-one else has updated it to something furhter on in the list
- //TODO - check
- //updateLastSeenEntry(sub, entry);
- return false;
- }
-
- }
-
- private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry)
- {
- QueueEntry node = sub.getLastSeenEntry();
-
- if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
- {
- do
- {
- if (sub.setLastSeenEntry(node, entry))
- {
- return;
- }
- else
- {
- node = sub.getLastSeenEntry();
- }
- }
- while (node != null && entry.compareTo(node) < 0);
- }
-
- }
-
- public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
- {
-
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- Subscription sub = subscriberIter.getNode().getSubscription();
-
- // we don't make browsers send the same stuff twice
- if (!sub.isBrowser())
- {
- updateLastSeenEntry(sub, entry);
- }
- }
-
- deliverAsync();
-
- }
-
- /**
- * Only call from queue Entry
- * @param storeContext
- * @param entry
- * @throws FailedDequeueException
- */
- public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
- {
- if (entry.acquiredBySubscription())
- {
- _deliveredMessages.decrementAndGet();
- }
-
- try
- {
- if (entry.isPersistent())
- {
- _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, entry.getMessageId());
- }
-
- }
- catch (MessageCleanupException e)
- {
- // Message was dequeued, but could not then be deleted
- // though it is no longer referenced. This should be very
- // rare and can be detected and cleaned up on recovery or
- // done through some form of manual intervention.
- _logger.error(e, e);
- }
- catch (AMQException e)
- {
- throw new FailedDequeueException(_name.toString(), e);
- }
-
- }
-
-
- public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
- {
- /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
- entry to resend and move back the subscription pointer. */
-
- subscription.getSendLock();
- try
- {
- if (!subscription.isClosed())
- {
- deliverMessage(subscription, entry);
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- subscription.releaseSendLock();
- }
- }
-
- public int getConsumerCount()
- {
- return _subscriptionList.size();
- }
-
- public int getActiveConsumerCount()
- {
- return _activeSubscriberCount.get();
- }
-
- public boolean isUnused()
- {
- return getConsumerCount() == 0;
- }
-
- public boolean isEmpty()
- {
- return getMessageCount() == 0;
- }
-
- public long getMemoryUsageCurrent()
- {
- return getQueueInMemory();
- }
-
- public int getMessageCount()
- {
- return getQueueCount();
- }
-
- public long getQueueDepth()
- {
- return getQueueSize();
- }
-
- public int getUndeliveredMessageCount()
- {
- int count = getMessageCount() - _deliveredMessages.get();
- if (count < 0)
- {
- return 0;
- }
- else
- {
- return count;
- }
- }
-
- public long getReceivedMessageCount()
- {
- return _totalMessagesReceived.get();
- }
-
- public long getOldestMessageArrivalTime()
- {
- QueueEntry entry = getOldestQueueEntry();
- return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
- }
-
- protected QueueEntry getOldestQueueEntry()
- {
- return _entries.next(_entries.getHead());
- }
-
- public boolean isDeleted()
- {
- return _deleted.get();
- }
-
- public List<QueueEntry> getMessagesOnTheQueue()
- {
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
- if (node != null && !node.isDeleted())
- {
- entryList.add(node);
- }
- }
- return entryList;
-
- }
-
- public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
- {
- if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
- {
- _activeSubscriberCount.decrementAndGet();
-
- }
- else if (newState == Subscription.State.ACTIVE)
- {
- if (oldState != Subscription.State.ACTIVE)
- {
- _activeSubscriberCount.incrementAndGet();
-
- }
- deliverAsync(sub);
- }
- }
-
- public int compareTo(final AMQQueue o)
- {
- return _name.compareTo(o.getName());
- }
-
- public int getQueueCount()
- {
- return _entries.size();
- }
-
- public long getQueueSize()
- {
- return _entries.dataSize();
- }
-
- public long getQueueInMemory()
- {
- return _entries.memoryUsed();
- }
-
- private boolean isExclusiveSubscriber()
- {
- return _exclusiveSubscriber != null;
- }
-
- private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
- {
- _exclusiveSubscriber = exclusiveSubscriber;
- }
-
- public static interface QueueEntryFilter
- {
- public boolean accept(QueueEntry entry);
-
- public boolean filterComplete();
- }
-
- public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
- {
- return getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessageId();
- return messageId >= fromMessageId && messageId <= toMessageId;
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
- }
-
- public QueueEntry getMessageOnTheQueue(final long messageId)
- {
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
- private boolean _complete;
-
- public boolean accept(QueueEntry entry)
- {
- _complete = entry.getMessageId() == messageId;
- return _complete;
- }
-
- public boolean filterComplete()
- {
- return _complete;
- }
- });
- return entries.isEmpty() ? null : entries.get(0);
- }
-
- public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
- {
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
- while (queueListIterator.advance() && !filter.filterComplete())
- {
- QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && filter.accept(node))
- {
- entryList.add(node);
- }
- }
- return entryList;
-
- }
-
-
- public void moveMessagesToAnotherQueue(final long fromMessageId,
- final long toMessageId,
- String queueName,
- StoreContext storeContext)
- {
- // The move is a two step process. First the messages are moved in the _transactionLog.
- // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the
- // existing queue.
- // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery
- // this is done as the message is recieved.
- // So The final step is to enqueue the messages on the new queue.
-
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- TransactionLog transactionLog = getVirtualHost().getTransactionLog();
-
- if (toQueue.equals(this))
- {
- //nothing to do here, message is already at the requested destination
- return;
- }
-
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessageId();
- return (messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && entry.acquire();
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
-
- try
- {
- transactionLog.beginTran(storeContext);
-
- // Move the messages in the transaction log.
- for (QueueEntry entry : entries)
- {
- if (entry.isPersistent())
- {
- //FIXME
- //fixme
-
- // Creating a list with the destination queue AND the current queue.
- // This is a hack to ensure a reference is kept in the TLog to the new destination when dequeing
- // the old destination below, thus preventing incorrect removal of the message from the store
- ArrayList<AMQQueue> list = new ArrayList<AMQQueue>();
- list.add(toQueue);
- list.add(this);
- transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
- }
- // dequeue will remove the messages from the queue
- entry.dequeue(storeContext);
- }
-
- // Commit and flush the move transactions.
- try
- {
- transactionLog.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
- }
- }
- catch (AMQException e)
- {
- try
- {
- transactionLog.abortTran(storeContext);
- }
- catch (AMQException rollbackEx)
- {
- _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
- }
- throw new RuntimeException(e);
- }
-
- try
- {
- // Add messages to new queue
- for (QueueEntry entry : entries)
- {
- toQueue.enqueue(storeContext, entry.getMessage());
- // As we only did a dequeue above now that we have moved the message we should perform a delete.
- // We cannot do this earlier as the message will be lost if flowed.
- entry.delete();
- }
- }
- catch (MessageCleanupException e)
- {
- throw new RuntimeException(e);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void copyMessagesToAnotherQueue(final long fromMessageId,
- final long toMessageId,
- String queueName,
- final StoreContext storeContext)
- {
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- TransactionLog transactionLog = getVirtualHost().getTransactionLog();
-
- if (toQueue.equals(this))
- {
- //nothing to do here, message is already at the requested destination
- return;
- }
-
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessageId();
- if ((messageId >= fromMessageId)
- && (messageId <= toMessageId))
- {
- if (!entry.isDeleted())
- {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
-
- try
- {
- transactionLog.beginTran(storeContext);
-
- // Move the messages in on the transaction log.
- for (QueueEntry entry : entries)
- {
- if (!entry.isDeleted() && entry.isPersistent())
- {
- //fixme
- //FIXME
-
- // Creating a list with the destination queue AND the current queue.
- // This is a hack to ensure a reference is kept in the TLog to the old destination when enqueing
- ArrayList list = new ArrayList();
- list.add(this);
- list.add(toQueue);
- transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
- }
- }
-
-
- // Commit and flush the move transcations.
- try
- {
- transactionLog.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
- }
- }
- catch (AMQException e)
- {
- try
- {
- transactionLog.abortTran(storeContext);
- }
- catch (AMQException rollbackEx)
- {
- _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
- }
- throw new RuntimeException(e);
- }
-
- try
- {
- for (QueueEntry entry : entries)
- {
- if (!entry.isDeleted())
- {
- toQueue.enqueue(storeContext, entry.getMessage());
- }
- }
- }
- catch (MessageCleanupException e)
- {
- throw new RuntimeException(e);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
- {
-
- try
- {
- QueueEntryIterator queueListIterator = _entries.iterator();
-
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
-
- final long messageId = node.getMessageId();
-
- if ((messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && !node.isDeleted()
- && node.acquire())
- {
- node.dequeueAndDelete(storeContext);
- }
-
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- // ------ Management functions
-
- public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
- {
- QueueEntryIterator queueListIterator = _entries.iterator();
- boolean noDeletes = true;
-
- while (noDeletes && queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
- {
- node.dequeueAndDelete(storeContext);
- noDeletes = false;
- }
-
- }
- }
-
- public long clearQueue(StoreContext storeContext) throws AMQException
- {
-
- QueueEntryIterator queueListIterator = _entries.iterator();
- long count = 0;
-
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
- {
- node.dequeueAndDelete(storeContext);
- count++;
- }
-
- }
- return count;
-
- }
-
- public void addQueueDeleteTask(final Task task)
- {
- _deleteTaskList.add(task);
- }
-
- public int delete() throws AMQException
- {
- if (!_deleted.getAndSet(true))
- {
-
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
-
- while (subscriptionIter.advance())
- {
- Subscription s = subscriptionIter.getNode().getSubscription();
- if (s != null)
- {
- s.queueDeleted(this);
- }
- }
-
- _bindings.deregister();
- _virtualHost.getQueueRegistry().unregisterQueue(_name);
-
- _managedObject.unregister();
- for (Task task : _deleteTaskList)
- {
- task.doTask(this);
- }
-
- _deleteTaskList.clear();
- stop();
- }
- return getMessageCount();
-
- }
-
- public void stop()
- {
- if (!_stopped.getAndSet(true))
- {
- ReferenceCountingExecutorService.getInstance().releaseExecutorService();
- _entries.stop();
- }
- }
-
- public void deliverAsync()
- {
- _stateChangeCount.incrementAndGet();
-
- Runner runner = new Runner();
-
- if (_asynchronousRunner.compareAndSet(null, runner))
- {
- _asyncDelivery.execute(runner);
- }
- }
-
- public void deliverAsync(Subscription sub)
- {
- _asyncDelivery.execute(new SubFlushRunner(sub));
- }
-
- private class Runner implements ReadWriteRunnable
- {
- public void run()
- {
- try
- {
- processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
-
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
- }
-
- private class SubFlushRunner implements ReadWriteRunnable
- {
- private final Subscription _sub;
-
- public SubFlushRunner(Subscription sub)
- {
- _sub = sub;
- }
-
- public void run()
- {
- boolean complete = false;
- try
- {
- complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
-
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- if (!complete && !_sub.isSuspended())
- {
- _asyncDelivery.execute(this);
- }
-
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
- }
-
- public void flushSubscription(Subscription sub) throws AMQException
- {
- flushSubscription(sub, Long.MAX_VALUE);
- }
-
- public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException
- {
- boolean atTail = false;
-
- while (!sub.isSuspended() && !atTail && iterations != 0)
- {
- try
- {
- sub.getSendLock();
- atTail = attemptDelivery(sub);
- if (atTail && sub.isAutoClose())
- {
- unregisterSubscription(sub);
-
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
- }
- else if (!atTail)
- {
- iterations--;
- }
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
-
- // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
- // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
- // which would give us memory "leak".
-
- if (!isExclusiveSubscriber())
- {
- advanceAllSubscriptions();
- }
- return atTail;
- }
-
- private boolean attemptDelivery(Subscription sub) throws AMQException
- {
- boolean atTail = false;
- boolean advanced = false;
- boolean subActive = sub.isActive();
- if (subActive)
- {
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (!(node.isAcquired() || node.isDeleted()))
- {
- if (!sub.isSuspended())
- {
- if (sub.hasInterest(node))
- {
- if (!sub.wouldSuspend(node))
- {
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
- }
- else
- {
- deliverMessage(sub, node);
-
- if (sub.isBrowser())
- {
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- }
- }
-
- }
- else // Not enough Credit for message and wouldSuspend
- {
- //QPID-1187 - Treat the subscription as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
- node.addStateChangeListener(new QueueEntryListener(sub, node));
- }
- }
- else
- {
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
- }
- }
-
- }
- atTail = (_entries.next(node) == null) && !advanced;
- }
- return atTail || !subActive;
- }
-
- protected void advanceAllSubscriptions() throws AMQException
- {
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- while (subscriberIter.advance())
- {
- SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
- Subscription sub = subNode.getSubscription();
- moveSubscriptionToNextNode(sub);
- }
- }
-
- private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
- throws AMQException
- {
- QueueEntry node = sub.getLastSeenEntry();
-
- while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
- {
- if (!node.isAcquired() && !node.isDeleted() && node.expired())
- {
- if (node.acquire())
- {
- // creating a new final store context per message seems wasteful.
- final StoreContext reapingStoreContext = new StoreContext();
- node.dequeueAndDelete(reapingStoreContext);
- }
- }
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- else
- {
- break;
- }
-
- }
- return node;
- }
-
- private void processQueue(Runnable runner) throws AMQException
- {
- long stateChangeCount;
- long previousStateChangeCount = Long.MIN_VALUE;
- boolean deliveryIncomplete = true;
-
- int extraLoops = 1;
- Long iterations = new Long(MAX_ASYNC_DELIVERIES);
-
- _asynchronousRunner.compareAndSet(runner, null);
-
- while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
- {
- // we want to have one extra loop after every subscription has reached the point where it cannot move
- // further, just in case the advance of one subscription in the last loop allows a different subscription to
- // move forward in the next iteration
-
- if (previousStateChangeCount != stateChangeCount)
- {
- extraLoops = 1;
- }
-
- previousStateChangeCount = stateChangeCount;
- deliveryIncomplete = _subscriptionList.size() != 0;
- boolean done = true;
-
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
- //iterate over the subscribers and try to advance their pointer
- while (subscriptionIter.advance())
- {
- boolean closeConsumer = false;
- Subscription sub = subscriptionIter.getNode().getSubscription();
- sub.getSendLock();
- try
- {
- if (sub != null)
- {
-
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null)
- {
- done = attemptDelivery(sub);
- }
- }
- if (done)
- {
- if (extraLoops == 0)
- {
- deliveryIncomplete = false;
- if (sub.isAutoClose())
- {
- unregisterSubscription(sub);
-
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
- }
- }
- else
- {
- extraLoops--;
- }
- }
- else
- {
- iterations--;
- extraLoops = 1;
- }
- }
- finally
- {
- sub.releaseSendLock();
- }
- }
- _asynchronousRunner.set(null);
- }
-
- // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
- // therefore we should schedule this runner again (unless someone beats us to it :-) ).
- if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
- {
- _asyncDelivery.execute(runner);
- }
- }
-
-
- public void checkMessageStatus() throws AMQException
- {
-
- final StoreContext storeContext = new StoreContext();
-
- QueueEntryIterator queueListIterator = _entries.iterator();
-
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.expired() && node.acquire())
- {
- node.dequeueAndDelete(storeContext);
- }
- else
- {
- _managedObject.checkForNotification(node);
- }
- }
-
- }
-
-
- public long getMemoryUsageMaximum()
- {
- return _entries.getMemoryUsageMaximum();
- }
-
- public void setMemoryUsageMaximum(long maximumMemoryUsage)
- {
- _entries.setMemoryUsageMaximum(maximumMemoryUsage);
- }
-
- public long getMemoryUsageMinimum()
- {
- return _entries.getMemoryUsageMinimum();
- }
-
- public void setMemoryUsageMinimum(long minimumMemoryUsage)
- {
- _entries.setMemoryUsageMinimum(minimumMemoryUsage);
- }
-
- public long getMinimumAlertRepeatGap()
- {
- return _minimumAlertRepeatGap;
- }
-
- public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
- {
- _minimumAlertRepeatGap = minimumAlertRepeatGap;
- }
-
- public long getMaximumMessageAge()
- {
- return _maximumMessageAge;
- }
-
- public void setMaximumMessageAge(long maximumMessageAge)
- {
- _maximumMessageAge = maximumMessageAge;
- if (maximumMessageAge == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
- }
- }
-
- public long getMaximumMessageCount()
- {
- return _maximumMessageCount;
- }
-
- public void setMaximumMessageCount(final long maximumMessageCount)
- {
- _maximumMessageCount = maximumMessageCount;
- if (maximumMessageCount == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
- }
-
- }
-
- public long getMaximumQueueDepth()
- {
- return _maximumQueueDepth;
- }
-
- // Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(final long maximumQueueDepth)
- {
- _maximumQueueDepth = maximumQueueDepth;
- if (maximumQueueDepth == 0L)
- {
- _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
-
- }
-
- public long getMaximumMessageSize()
- {
- return _maximumMessageSize;
- }
-
- public void setMaximumMessageSize(final long maximumMessageSize)
- {
- _maximumMessageSize = maximumMessageSize;
- if (maximumMessageSize == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
- }
-
- public Set<NotificationCheck> getNotificationChecks()
- {
- return _notificationChecks;
- }
-
- public ManagedObject getManagedObject()
- {
- return _managedObject;
- }
-
- private final class QueueEntryListener implements QueueEntry.StateChangeListener
- {
- private final QueueEntry _entry;
- private final Subscription _sub;
-
- public QueueEntryListener(final Subscription sub, final QueueEntry entry)
- {
- _entry = entry;
- _sub = sub;
- }
-
- public boolean equals(Object o)
- {
- return _entry == ((QueueEntryListener) o)._entry && _sub == ((QueueEntryListener) o)._sub;
- }
-
- public int hashCode()
- {
- return System.identityHashCode(_entry) ^ System.identityHashCode(_sub);
- }
-
- public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
- {
- entry.removeStateChangeListener(this);
- deliverAsync(_sub);
- }
- }
-
- public List<Long> getMessagesOnTheQueue(int num)
- {
- return getMessagesOnTheQueue(num, 0);
- }
-
- public List<Long> getMessagesOnTheQueue(int num, int offset)
- {
- ArrayList<Long> ids = new ArrayList<Long>(num);
- QueueEntryIterator it = _entries.iterator();
- for (int i = 0; i < offset; i++)
- {
- it.advance();
- }
-
- for (int i = 0; i < num && !it.atTail(); i++)
- {
- it.advance();
- ids.add(it.getNode().getMessageId());
- }
- return ids;
- }
-
-
- public String getType()
- {
- return getClass().getSimpleName() + "[" + getName() +"]";
- }
-
- public String toString()
- {
- return getType() + "[Owner:" + _owner + "][Durable:" + _durable + "]";
- }
-
- public void configure(QueueConfiguration config)
- {
- if (config != null)
- {
- setMaximumMessageAge(config.getMaximumMessageAge());
- setMaximumQueueDepth(config.getMaximumQueueDepth());
- setMaximumMessageSize(config.getMaximumMessageSize());
- setMaximumMessageCount(config.getMaximumMessageCount());
- setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
- setMemoryUsageMaximum(config.getMemoryUsageMaximum());
- setMemoryUsageMinimum(config.getMemoryUsageMinimum());
- }
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
deleted file mode 100644
index a10e332ef5..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package org.apache.qpid.server.queue;
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public class SimpleQueueEntryList extends FlowableBaseQueueEntryList
-{
-
- private final QueueEntryImpl _head;
-
- private volatile QueueEntryImpl _tail;
-
- static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl>
- _tailUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail");
-
-
- private final AMQQueue _queue;
-
- static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl>
- _nextUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, QueueEntryImpl.class, "_next");
-
- public SimpleQueueEntryList(AMQQueue queue)
- {
- super(queue);
- _queue = queue;
- _head = new QueueEntryImpl(this);
- _tail = _head;
- }
-
- void advanceHead()
- {
- QueueEntryImpl head = _head.nextNode();
- while(head._next != null && head.isDeleted())
- {
-
- final QueueEntryImpl newhead = head.nextNode();
- if(newhead != null)
- {
- _nextUpdater.compareAndSet(_head,head, newhead);
- }
- head = _head.nextNode();
- }
- }
-
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
-
- public QueueEntry add(AMQMessage message)
- {
- QueueEntryImpl node = new QueueEntryImpl(this, message);
-
- incrementCounters(node);
-
- for (;;)
- {
- QueueEntryImpl tail = _tail;
- QueueEntryImpl next = tail.nextNode();
- if (tail == _tail)
- {
- if (next == null)
- {
- node.setEntryId(tail.getEntryId()+1);
- if (_nextUpdater.compareAndSet(tail, null, node))
- {
- _tailUpdater.compareAndSet(this, tail, node);
-
- return node;
- }
- }
- else
- {
- _tailUpdater.compareAndSet(this,tail, next);
- }
- }
- }
- }
-
-
- public QueueEntry next(QueueEntry node)
- {
- return ((QueueEntryImpl)node).getNext();
- }
-
- public class QueueEntryIteratorImpl implements QueueEntryIterator
- {
-
- private QueueEntryImpl _lastNode;
-
- QueueEntryIteratorImpl(QueueEntryImpl startNode)
- {
- _lastNode = startNode;
- }
-
-
- public boolean atTail()
- {
- return _lastNode.nextNode() == null;
- }
-
- public QueueEntry getNode()
- {
-
- return _lastNode;
-
- }
-
- public boolean advance()
- {
-
- if(!atTail())
- {
- QueueEntryImpl nextNode = _lastNode.nextNode();
- while(nextNode.isDeleted() && nextNode.nextNode() != null)
- {
- nextNode = nextNode.nextNode();
- }
- _lastNode = nextNode;
- return true;
-
- }
- else
- {
- return false;
- }
-
- }
-
- }
-
-
- public QueueEntryIterator iterator()
- {
- return new QueueEntryIteratorImpl(_head);
- }
-
-
- public QueueEntry getHead()
- {
- return _head;
- }
-
- static class Factory implements QueueEntryListFactory
- {
-
- public QueueEntryList createQueueEntryList(AMQQueue queue)
- {
- return new SimpleQueueEntryList(queue);
- }
-
- }
-
-
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
deleted file mode 100644
index 4c9fe81439..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.StoreContext;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/** A deliverable message. */
-public class TransientAMQMessage implements AMQMessage
-{
- /** Used for debugging purposes. */
- protected static final Logger _log = Logger.getLogger(AMQMessage.class);
-
- protected ContentHeaderBody _contentHeaderBody;
-
- protected MessagePublishInfo _messagePublishInfo;
-
- protected List<ContentChunk> _contentBodies;
-
- protected long _arrivalTime;
-
- protected final Long _messageId;
-
-
- private byte _flags = 0;
-
- private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
- private long _expiration;
-
- /**
- * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
- * therefore is memory-efficient.
- */
- private class BodyFrameIterator implements Iterator<AMQDataBlock>
- {
- private int _channel;
-
- private int _index = -1;
- private AMQProtocolSession _protocolSession;
-
- private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- _channel = channel;
- _protocolSession = protocolSession;
- }
-
- public boolean hasNext()
- {
- return _index < (getBodyCount() - 1);
- }
-
- public AMQDataBlock next()
- {
- AMQBody cb =
- getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index));
-
- return new AMQFrame(_channel, cb);
- }
-
- private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
- {
- return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private class BodyContentIterator implements Iterator<ContentChunk>
- {
-
- private int _index = -1;
-
- public boolean hasNext()
- {
- return _index < (getBodyCount() - 1);
- }
-
- public ContentChunk next()
- {
- return getContentChunk(++_index);
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- /**
- * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
- * These all need refactoring to some sort of MockAMQMessageFactory.
- */
- @Deprecated
- protected TransientAMQMessage(AMQMessage message) throws AMQException
- {
- _messageId = message.getMessageId();
- _flags = ((TransientAMQMessage) message)._flags;
- _contentHeaderBody = message.getContentHeaderBody();
- _messagePublishInfo = message.getMessagePublishInfo();
- }
-
- /**
- * Normal message creation via the MessageFactory uses this constructor
- * Package scope limited as MessageFactory should be used
- *
- * @param messageId
- *
- * @see MessageFactory
- */
- TransientAMQMessage(Long messageId)
- {
- _messageId = messageId;
- }
-
- public String debugIdentity()
- {
- return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() +")";
- }
-
- public void setExpiration(long expiration)
- {
- _expiration = expiration;
- }
-
- public long getExpiration()
- {
- return _expiration;
- }
-
- public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- return new BodyFrameIterator(protocolSession, channel);
- }
-
- public Iterator<ContentChunk> getContentBodyIterator()
- {
- return new BodyContentIterator();
- }
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return _contentHeaderBody;
- }
-
- public Long getMessageId()
- {
- return _messageId;
- }
-
-
- public long getSize()
- {
- return _contentHeaderBody.bodySize;
- }
-
- public Object getPublisherClientInstance()
- {
- return _sessionIdentifier.getSessionInstance();
- }
-
- public Object getPublisherIdentifier()
- {
- return _sessionIdentifier.getSessionIdentifier();
- }
-
- public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
- {
- _sessionIdentifier = sessionIdentifier;
- }
-
- /** From AMQMessageHandle * */
-
- public int getBodyCount()
- {
- return _contentBodies.size();
- }
-
- public ContentChunk getContentChunk(int index)
- {
- if (_contentBodies == null)
- {
- throw new RuntimeException("No ContentBody has been set");
- }
-
- if (index > _contentBodies.size() - 1 || index < 0)
- {
- throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
- (_contentBodies.size() - 1));
- }
- return _contentBodies.get(index);
- }
-
- public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
- throws AMQException
- {
- if (_contentBodies == null)
- {
- if (isLastContentBody)
- {
- _contentBodies = Collections.singletonList(contentChunk);
- }
- else
- {
- _contentBodies = new ArrayList<ContentChunk>();
- _contentBodies.add(contentChunk);
- }
- }
- else
- {
- _contentBodies.add(contentChunk);
- }
- }
-
- public MessagePublishInfo getMessagePublishInfo()
- {
- return _messagePublishInfo;
- }
-
- public boolean isPersistent()
- {
- return false;
- }
-
- public boolean isImmediate()
- {
- return _messagePublishInfo.isImmediate();
- }
-
- /**
- * This is called when all the content has been received.
- *
- * @param storeContext
- * @param messagePublishInfo
- * @param contentHeaderBody @throws AMQException
- */
- public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
- ContentHeaderBody contentHeaderBody)
- throws AMQException
- {
-
- if (contentHeaderBody == null)
- {
- throw new NullPointerException("HeaderBody cannot be null");
- }
-
- if (messagePublishInfo == null)
- {
- throw new NullPointerException("PublishInfo cannot be null");
- }
-
- _arrivalTime = System.currentTimeMillis();
-
-
- _contentHeaderBody = contentHeaderBody;
- _messagePublishInfo = messagePublishInfo;
-
- updateHeaderAndFlags();
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
- public void recoverFromMessageMetaData(MessageMetaData mmd)
- {
- _arrivalTime = mmd.getArrivalTime();
- _contentHeaderBody = mmd.getContentHeaderBody();
- _messagePublishInfo = mmd.getMessagePublishInfo();
-
- updateHeaderAndFlags();
- }
-
- private void updateHeaderAndFlags()
- {
- if (_contentHeaderBody.bodySize == 0)
- {
- _contentBodies = Collections.EMPTY_LIST;
- }
- }
-
- public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
- {
- addContentBodyFrame(null, contentChunk, isLastContentBody);
- }
-
-
- public String toString()
- {
- // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- // _taken + " by :" + _takenBySubcription;
-
- return "Message[" + debugIdentity() + "]: " + getMessageId() ;
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
deleted file mode 100644
index b09283b11f..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collections;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-
-/**
- * Contains data that is only used in AMQMessage transiently, e.g. while the content
- * body fragments are arriving.
- *
- * Having this data stored in a separate class means that the AMQMessage class avoids
- * the small overhead of numerous guaranteed-null references.
- *
- * @author Apache Software Foundation
- */
-public class TransientMessageData
-{
- /**
- * Stored temporarily until the header has been received at which point it is used when
- * constructing the handle
- */
- private MessagePublishInfo _messagePublishInfo;
-
- /**
- * Also stored temporarily.
- */
- private ContentHeaderBody _contentHeaderBody;
-
- /**
- * Keeps a track of how many bytes we have received in body frames
- */
- private long _bodyLengthReceived = 0;
-
- /**
- * This is stored during routing, to know the queues to which this message should immediately be
- * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
- * by the message handle.
- */
- private List<AMQQueue> _destinationQueues;
-
- public MessagePublishInfo getMessagePublishInfo()
- {
- return _messagePublishInfo;
- }
-
- public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
- {
- _messagePublishInfo = messagePublishInfo;
- }
-
- public List<AMQQueue> getDestinationQueues()
- {
- return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues;
- }
-
- public void setDestinationQueues(List<AMQQueue> destinationQueues)
- {
- _destinationQueues = destinationQueues;
- }
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return _contentHeaderBody;
- }
-
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
- {
- _contentHeaderBody = contentHeaderBody;
- }
-
- public long getBodyLengthReceived()
- {
- return _bodyLengthReceived;
- }
-
- public void addBodyLength(int value)
- {
- _bodyLengthReceived += value;
- }
-
- public boolean isAllContentReceived() throws AMQException
- {
- return _bodyLengthReceived == _contentHeaderBody.bodySize;
- }
-
- public void addDestinationQueue(AMQQueue queue)
- {
- if(_destinationQueues == null)
- {
- _destinationQueues = new ArrayList<AMQQueue>();
- }
- _destinationQueues.add(queue);
- }
-
- public boolean isPersistent()
- {
- return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() ==
- BasicContentHeaderProperties.PERSISTENT;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java
deleted file mode 100644
index 03cfed8533..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-public class UnableToFlowMessageException extends Exception
-{
- public UnableToFlowMessageException(long messageId, Exception error)
- {
- super("Unable to Flow Message:"+messageId, error);
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java
deleted file mode 100644
index cae5bc6327..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-public class UnableToRecoverMessageException extends RuntimeException
-{
- public UnableToRecoverMessageException(Exception error)
- {
- super(error);
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java
deleted file mode 100644
index 295cb266b9..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.RequiredDeliveryException;
-
-/**
- * UnauthorizedAccessException is a {@link RequiredDeliveryException} that represents the failure case where a message
- * is published with a user id different from the one used when creating the connection .
- * The AMQP status code, 403, is always used to report this condition.
- *
- */
-
-public class UnauthorizedAccessException extends RequiredDeliveryException
-{
- public UnauthorizedAccessException(String msg, AMQMessage amqMessage)
- {
- super(msg, amqMessage);
- }
-
- public AMQConstant getReplyCode()
- {
- return AMQConstant.ACCESS_REFUSED;
- }
-}