summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java1095
1 files changed, 0 insertions, 1095 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
deleted file mode 100644
index b6ad62b050..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ /dev/null
@@ -1,1095 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.messageStore.StorableMessage;
-import org.apache.qpid.server.messageStore.StorableQueue;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import javax.management.JMException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
- * fully in RFC 006.
- */
-public class AMQQueue implements Managable, Comparable, StorableQueue
-{
- public static int s_queueID = 0;
-
- private static final Logger _logger = Logger.getLogger(AMQQueue.class);
-
- private final AMQShortString _name;
-
- // The queueu ID
- int _queueId;
- // The list of enqueued messages.
- Hashtable<Long, StorableMessage> _messages = new Hashtable<Long, StorableMessage>();
-
- /**
- * null means shared
- */
- private final AMQShortString _owner;
-
- private final boolean _durable;
-
- /**
- * If true, this queue is deleted when the last subscriber is removed
- */
- private final boolean _autoDelete;
-
- /**
- * Holds subscribers to the queue.
- */
- private final SubscriptionSet _subscribers;
-
- private final SubscriptionFactory _subscriptionFactory;
-
- private final AtomicInteger _subscriberCount = new AtomicInteger();
-
- private final AtomicBoolean _isExclusive = new AtomicBoolean();
-
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
- private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
-
- /**
- * Manages message delivery.
- */
- private final DeliveryManager _deliveryMgr;
-
- /**
- * Used to track bindings to exchanges so that on deletion they can easily be cancelled.
- */
- private final ExchangeBindings _bindings = new ExchangeBindings(this);
-
- /**
- * Executor on which asynchronous delivery will be carriedout where required
- */
- private final Executor _asyncDelivery;
-
- private final AMQQueueMBean _managedObject;
-
- private final VirtualHost _virtualHost;
-
- /**
- * max allowed size(KB) of a single message
- */
- @Configured(path = "maximumMessageSize", defaultValue = "0")
- public long _maximumMessageSize;
-
- /**
- * max allowed number of messages on a queue.
- */
- @Configured(path = "maximumMessageCount", defaultValue = "0")
- public long _maximumMessageCount;
-
- /**
- * max queue depth for the queue
- */
- @Configured(path = "maximumQueueDepth", defaultValue = "0")
- public long _maximumQueueDepth;
-
- /**
- * maximum message age before alerts occur
- */
- @Configured(path = "maximumMessageAge", defaultValue = "0")
- public long _maximumMessageAge;
-
- /**
- * the minimum interval between sending out consequetive alerts of the same type
- */
- @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
- public long _minimumAlertRepeatGap;
-
- /**
- * total messages received by the queue since startup.
- */
- public AtomicLong _totalMessagesReceived = new AtomicLong();
-
-
- private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
-
-
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
- new SubscriptionSet(), new SubscriptionImpl.Factory());
- }
-
- protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
- {
- this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
- new SubscriptionImpl.Factory());
- }
-
- protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
- SubscriptionFactory subscriptionFactory) throws AMQException
- {
- if (name == null)
- {
- throw new IllegalArgumentException("Queue name must not be null");
- }
-
- if (virtualHost == null)
- {
- throw new IllegalArgumentException("Virtual Host must not be null");
- }
-
- _name = name;
- _durable = durable;
- _owner = owner;
- _autoDelete = autoDelete;
- _virtualHost = virtualHost;
- _asyncDelivery = asyncDelivery;
-
- _managedObject = createMBean();
- _managedObject.register();
-
- _subscribers = subscribers;
- _subscriptionFactory = subscriptionFactory;
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- _queueId = s_queueID++;
-
- // This ensure that the notification checks for the configured alerts are created.
- setMaximumMessageAge(_maximumMessageAge);
- setMaximumMessageCount(_maximumMessageCount);
- setMaximumMessageSize(_maximumMessageSize);
- setMaximumQueueDepth(_maximumQueueDepth);
-
- }
-
- private AMQQueueMBean createMBean() throws AMQException
- {
- try
- {
- return new AMQQueueMBean(this);
- }
- catch (JMException ex)
- {
- throw new AMQException(null, "AMQQueue MBean creation has failed ", ex);
- }
- }
-
- public final AMQShortString getName()
- {
- return _name;
- }
-
- public boolean isShared()
- {
- return _owner == null;
- }
-
- public boolean isDurable()
- {
- return _durable;
- }
-
- public AMQShortString getOwner()
- {
- return _owner;
- }
-
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
-
- public boolean isDeleted()
- {
- return _deleted.get();
- }
-
- /**
- * @return no of messages(undelivered) on the queue.
- */
- public int getMessageCount()
- {
- return _deliveryMgr.getQueueMessageCount();
- }
-
- /**
- * @return List of messages(undelivered) on the queue.
- */
- public List<QueueEntry> getMessagesOnTheQueue()
- {
- return _deliveryMgr.getMessages();
- }
-
- /**
- * Returns messages within the given range of message Ids
- *
- * @param fromMessageId
- * @param toMessageId
- * @return List of messages
- */
- public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
- {
- return _deliveryMgr.getMessages(fromMessageId, toMessageId);
- }
-
- public long getQueueDepth()
- {
- return _deliveryMgr.getTotalMessageSize();
- }
-
- /**
- * @param messageId
- * @return QueueEntry with give id if exists. null if QueueEntry with given id doesn't exist.
- */
- public QueueEntry getMessageOnTheQueue(long messageId)
- {
- List<QueueEntry> list = getMessagesOnTheQueue(messageId, messageId);
- if ((list == null) || (list.size() == 0))
- {
- return null;
- }
-
- return list.get(0);
- }
-
- /**
- * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
- * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup
- * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue
- * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
- * locks from both queues and start async delivery
- *
- * @param fromMessageId The first message id to move.
- * @param toMessageId The last message id to move.
- * @param queueName The queue to move the messages to.
- * @param storeContext The context of the message store under which to perform the move. This is associated with
- * the stores transactional context.
- */
- public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext)
- {
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-
- MessageStore fromStore = getVirtualHost().getMessageStore();
- MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
-
- if (toStore != fromStore)
- {
- throw new RuntimeException("Can only move messages between queues on the same message store.");
- }
-
- try
- {
- // Obtain locks to prevent activity on the queues being moved between.
- startMovingMessages();
- toQueue.startMovingMessages();
-
- // Get the list of messages to move.
- List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
-
- try
- {
- fromStore.beginTran(storeContext);
-
- // Move the messages in on the message store.
- for (QueueEntry entry : foundMessagesList)
- {
- AMQMessage message = entry.getMessage();
- fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
- toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
- }
-
- // Commit and flush the move transcations.
- try
- {
- fromStore.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
-
- // Move the messages on the in-memory queues.
- toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
- _deliveryMgr.removeMovedMessages(foundMessagesList);
- }
- // Abort the move transactions on move failures.
- catch (AMQException e)
- {
- try
- {
- fromStore.abortTran(storeContext);
- }
- catch (AMQException ae)
- {
- throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
- }
- }
- }
- // Release locks to allow activity on the queues being moved between to continue.
- finally
- {
- toQueue.stopMovingMessages();
- stopMovingMessages();
- }
- }
-
- /**
- * Copies messages on this queue to another queue, and also commits the move on the message store. Delivery activity
- * on the queues being moved between is suspended during the move.
- *
- * @param fromMessageId The first message id to move.
- * @param toMessageId The last message id to move.
- * @param queueName The queue to move the messages to.
- * @param storeContext The context of the message store under which to perform the move. This is associated with
- * the stores transactional context.
- */
- public synchronized void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext)
- {
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-
- MessageStore fromStore = getVirtualHost().getMessageStore();
- MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
-
- if (toStore != fromStore)
- {
- throw new RuntimeException("Can only move messages between queues on the same message store.");
- }
-
- try
- {
- // Obtain locks to prevent activity on the queues being moved between.
- startMovingMessages();
- toQueue.startMovingMessages();
-
- // Get the list of messages to move.
- List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
-
- try
- {
- fromStore.beginTran(storeContext);
-
- // Move the messages in on the message store.
- for (QueueEntry entry : foundMessagesList)
- {
- AMQMessage message = entry.getMessage();
- toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
- message.takeReference();
- }
-
- // Commit and flush the move transcations.
- try
- {
- fromStore.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
-
- // Move the messages on the in-memory queues.
- toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
- }
- // Abort the move transactions on move failures.
- catch (AMQException e)
- {
- try
- {
- fromStore.abortTran(storeContext);
- }
- catch (AMQException ae)
- {
- throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
- }
- }
- }
- // Release locks to allow activity on the queues being moved between to continue.
- finally
- {
- toQueue.stopMovingMessages();
- stopMovingMessages();
- }
- }
-
- /**
- * Removes messages from this queue, and also commits the remove on the message store. Delivery activity
- * on the queues being moved between is suspended during the remove.
- *
- * @param fromMessageId The first message id to move.
- * @param toMessageId The last message id to move.
- * @param storeContext The context of the message store under which to perform the move. This is associated with
- * the stores transactional context.
- */
- public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
- {
- MessageStore fromStore = getVirtualHost().getMessageStore();
-
- try
- {
- // Obtain locks to prevent activity on the queues being moved between.
- startMovingMessages();
-
- // Get the list of messages to move.
- List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
-
- try
- {
- fromStore.beginTran(storeContext);
-
- // remove the messages in on the message store.
- for (QueueEntry entry : foundMessagesList)
- {
- AMQMessage message = entry.getMessage();
- fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
- }
-
- // Commit and flush the move transcations.
- try
- {
- fromStore.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
-
- // remove the messages on the in-memory queues.
- _deliveryMgr.removeMovedMessages(foundMessagesList);
- }
- // Abort the move transactions on move failures.
- catch (AMQException e)
- {
- try
- {
- fromStore.abortTran(storeContext);
- }
- catch (AMQException ae)
- {
- throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
- }
- }
- }
- // Release locks to allow activity on the queues being moved between to continue.
- finally
- {
- stopMovingMessages();
- }
- }
-
- public void startMovingMessages()
- {
- _deliveryMgr.startMovingMessages();
- }
-
- private void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> messageList)
- {
- _deliveryMgr.enqueueMovedMessages(storeContext, messageList);
- _totalMessagesReceived.addAndGet(messageList.size());
- }
-
- public void stopMovingMessages()
- {
- _deliveryMgr.stopMovingMessages();
- _deliveryMgr.processAsync(_asyncDelivery);
- }
-
- /**
- * @return MBean object associated with this Queue
- */
- public ManagedObject getManagedObject()
- {
- return _managedObject;
- }
-
- public long getMaximumMessageSize()
- {
- return _maximumMessageSize;
- }
-
- public void setMaximumMessageSize(final long maximumMessageSize)
- {
- _maximumMessageSize = maximumMessageSize;
- if(maximumMessageSize == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
- }
-
- public int getConsumerCount()
- {
- return _subscribers.size();
- }
-
- public int getActiveConsumerCount()
- {
- return _subscribers.getWeight();
- }
-
- public long getReceivedMessageCount()
- {
- return _totalMessagesReceived.get();
- }
-
- public long getMaximumMessageCount()
- {
- return _maximumMessageCount;
- }
-
- public void setMaximumMessageCount(final long maximumMessageCount)
- {
- _maximumMessageCount = maximumMessageCount;
- if(maximumMessageCount == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
- }
-
-
-
- }
-
- public long getMaximumQueueDepth()
- {
- return _maximumQueueDepth;
- }
-
- // Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(final long maximumQueueDepth)
- {
- _maximumQueueDepth = maximumQueueDepth;
- if(maximumQueueDepth == 0L)
- {
- _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
-
- }
-
- public long getOldestMessageArrivalTime()
- {
- return _deliveryMgr.getOldestMessageArrival();
-
- }
-
- /** Removes the QueueEntry from the top of the queue. */
- public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
- {
- _deliveryMgr.removeAMessageFromTop(storeContext);
- }
-
- /**
- * removes all the messages from the queue.
- */
- public synchronized long clearQueue(StoreContext storeContext) throws AMQException
- {
- return _deliveryMgr.clearAllMessages(storeContext);
- }
-
- public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
- {
- exchange.registerQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
- _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
- //DTX MessageStore
-// try
-// {
-// _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
-// }
-// catch (InternalErrorException e)
-// {
-// throw new AMQException(null, "Problem binding queue ", e);
-// }
- }
-
- _bindings.addBinding(routingKey, arguments, exchange);
- }
-
- public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
- {
- exchange.deregisterQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
-
- _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
- //DTX MessageStore
-// try
-// {
-// _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
-// }
-// catch (InternalErrorException e)
-// {
-// throw new AMQException(null, "problem unbinding queue", e);
-// }
- }
-
- _bindings.remove(routingKey, arguments, exchange);
- }
-
- public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
- {
- if (incrementSubscriberCount() > 1)
- {
- if (isExclusive())
- {
- decrementSubscriberCount();
- throw new ExistingExclusiveSubscriptionException();
- }
- else if (exclusive)
- {
- decrementSubscriberCount();
- throw new ExistingSubscriptionPreventsExclusiveException();
- }
-
- }
- else if (exclusive)
- {
- setExclusive(true);
- }
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format(
- "Registering protocol session {0} with channel {1} and " + "consumer tag {2} with {3}", ps, channel,
- consumerTag, this));
- }
-
- Subscription subscription =
- _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
-
- if (subscription.filtersMessages())
- {
- if (_deliveryMgr.hasQueuedMessages())
- {
- _deliveryMgr.populatePreDeliveryQueue(subscription);
- }
- }
-
- _subscribers.addSubscriber(subscription);
- if(exclusive)
- {
- _subscribers.setExclusive(true);
- }
- }
-
- private boolean isExclusive()
- {
- return _isExclusive.get();
- }
-
- private void setExclusive(boolean exclusive)
- {
- _isExclusive.set(exclusive);
- }
-
- private int incrementSubscriberCount()
- {
- return _subscriberCount.incrementAndGet();
- }
-
- private int decrementSubscriberCount()
- {
- return _subscriberCount.decrementAndGet();
- }
-
- public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format(
- "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel,
- consumerTag, this));
- }
-
- _subscribers.setExclusive(false);
- Subscription removedSubscription;
-
- if ((removedSubscription =
- _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag)))
- == null)
- {
- throw new AMQException(null, "Protocol session with channel " + channel + " and consumer tag " + consumerTag
- + " and protocol session key " + ps.getKey() + " not registered with queue " + this, null);
- }
-
- removedSubscription.close();
- setExclusive(false);
- decrementSubscriberCount();
-
- // if we are eligible for auto deletion, unregister from the queue registry
- if (_autoDelete && _subscribers.isEmpty())
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Auto-deleteing queue:" + this);
- }
-
- autodelete();
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- removedSubscription.queueDeleted(this);
- }
- }
-
- public boolean isUnused()
- {
- return _subscribers.isEmpty();
- }
-
- public boolean isEmpty()
- {
- return !_deliveryMgr.hasQueuedMessages();
- }
-
- public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
- {
- if (checkUnused && !_subscribers.isEmpty())
- {
- _logger.info("Will not delete " + this + " as it is in use.");
-
- return 0;
- }
- else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
- {
- _logger.info("Will not delete " + this + " as it is not empty.");
-
- return 0;
- }
- else
- {
- delete();
-
- return _deliveryMgr.getQueueMessageCount();
- }
- }
-
- public void delete() throws AMQException
- {
- if (!_deleted.getAndSet(true))
- {
- _subscribers.queueDeleted(this);
- _bindings.deregister();
- _virtualHost.getQueueRegistry().unregisterQueue(_name);
- _managedObject.unregister();
- for (Task task : _deleteTaskList)
- {
- task.doTask(this);
- }
-
- _deleteTaskList.clear();
- }
- }
-
- protected void autodelete() throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format("autodeleting {0}", this));
- }
-
- delete();
- }
-
- /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
- {
- // fixme not sure what this is doing. should we be passing deliverFirst through here?
- // This code is not used so when it is perhaps it should
- _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
- try
- {
- msg.checkDeliveredToConsumer();
- updateReceivedMessageCount(msg);
- }
- catch (NoConsumersException e)
- {
- // as this message will be returned, it should be removed
- // from the queue:
- dequeue(storeContext, msg);
- }
- }*/
-
- // public DeliveryManager getDeliveryManager()
- // {
- // return _deliveryMgr;
- // }
-
- public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
- {
- AMQMessage msg = entry.getMessage();
- _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
- try
- {
- msg.checkDeliveredToConsumer();
- updateReceivedMessageCount(entry);
- }
- catch (NoConsumersException e)
- {
- // as this message will be returned, it should be removed
- // from the queue:
- dequeue(storeContext, entry);
- }
- }
-
- public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
- {
- try
- {
- entry.getMessage().dequeue(storeContext, this);
- }
- catch (MessageCleanupException e)
- {
- // Message was dequeued, but could not then be deleted
- // though it is no longer referenced. This should be very
- // rare and can be detected and cleaned up on recovery or
- // done through some form of manual intervention.
- _logger.error(e, e);
- }
- catch (AMQException e)
- {
- throw new FailedDequeueException(_name.toString(), e);
- }
- }
-
- public void deliverAsync()
- {
- _deliveryMgr.processAsync(_asyncDelivery);
- }
-
- protected SubscriptionManager getSubscribers()
- {
- return _subscribers;
- }
-
- protected void updateReceivedMessageCount(QueueEntry entry) throws AMQException
- {
- AMQMessage msg = entry.getMessage();
-
- if (!msg.isRedelivered())
- {
- _totalMessagesReceived.incrementAndGet();
- }
-
- try
- {
- _managedObject.checkForNotification(msg);
- }
- catch (JMException e)
- {
- throw new AMQException(null, "Unable to get notification from manage queue: " + e, e);
- }
- }
-
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
-
- if ((o == null) || (getClass() != o.getClass()))
- {
- return false;
- }
-
- final AMQQueue amqQueue = (AMQQueue) o;
-
- return (_name.equals(amqQueue._name));
- }
-
- public int hashCode()
- {
- return _name.hashCode();
- }
-
- public String toString()
- {
- return "Queue(" + _name + ")@" + System.identityHashCode(this);
- }
-
- public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
- {
- return _deliveryMgr.performGet(session, channel, acks);
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _virtualHost.getQueueRegistry();
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- public static interface Task
- {
- public void doTask(AMQQueue queue) throws AMQException;
- }
-
- public void addQueueDeleteTask(Task task)
- {
- _deleteTaskList.add(task);
- }
-
- public long getMinimumAlertRepeatGap()
- {
- return _minimumAlertRepeatGap;
- }
-
- public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
- {
- _minimumAlertRepeatGap = minimumAlertRepeatGap;
- }
-
- public long getMaximumMessageAge()
- {
- return _maximumMessageAge;
- }
-
- public void setMaximumMessageAge(long maximumMessageAge)
- {
- _maximumMessageAge = maximumMessageAge;
- if(maximumMessageAge == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
- }
- }
-
- public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
- {
- _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, entry);
- }
-
- /*
- * TGM: wow, this is not going to have a high fun quotient
- */
-
- // ========================================================================
- // Interface StorableQueue
- // ========================================================================
-
- public int getQueueID()
- {
- return _queueId;
- }
-
- public void setQueueID(int id)
- {
- _queueId = id;
- }
-
- public void dequeue(StorableMessage m)
- {
- _messages.remove(m.getMessageId());
- }
-
- public void enqueue(StorableMessage m)
- {
- _messages.put(m.getMessageId(), m);
- }
-
- // ========================================================================
- // Used by the Store
- // ========================================================================
-
- /**
- * Get the list of enqueud messages
- *
- * @return The list of enqueud messages
- */
- public Collection<StorableMessage> getAllEnqueuedMessages()
- {
- return _messages.values();
- }
-
- /**
- * Get the enqueued message identified by messageID
- *
- * @param messageId the id of the enqueued message to recover
- * @return The enqueued message with the specified id
- */
- public StorableMessage getEnqueuedMessage(long messageId)
- {
- return _messages.get(messageId);
- }
-
- public QueueEntry createEntry(AMQMessage amqMessage)
- {
- return new QueueEntry(this, amqMessage);
- }
-
- public int compareTo(Object o)
- {
- return _name.compareTo(((AMQQueue) o).getName());
- }
-
-
- public void removeExpiredIfNoSubscribers() throws AMQException
- {
- synchronized(_subscribers.getChangeLock())
- {
- if(_subscribers.isEmpty())
- {
- _deliveryMgr.removeExpired();
- }
- }
- }
-
- public final Set<NotificationCheck> getNotificationChecks()
- {
- return _notificationChecks;
- }
-}