summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java496
1 files changed, 203 insertions, 293 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d7eb304c92..d42bd6cf03 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -19,8 +19,10 @@
package org.apache.qpid.server.queue;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,12 +30,11 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import javax.management.JMException;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
@@ -52,7 +53,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -70,6 +70,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+
private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group";
@@ -77,11 +78,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
// TODO - should make this configurable at the vhost / broker level
private static final int DEFAULT_MAX_GROUPS = 255;
-
private final VirtualHost _virtualHost;
private final AMQShortString _name;
- private final String _resourceName;
/** null means shared */
private final AMQShortString _owner;
@@ -118,6 +117,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final AtomicLong _dequeueCount = new AtomicLong();
private final AtomicLong _dequeueSize = new AtomicLong();
+ private final AtomicLong _enqueueCount = new AtomicLong();
private final AtomicLong _enqueueSize = new AtomicLong();
private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
@@ -130,6 +130,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicLong _unackedMsgCount = new AtomicLong(0);
private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
+ private final AtomicLong _unackedMsgBytes = new AtomicLong();
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -173,7 +174,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private LogSubject _logSubject;
private LogActor _logActor;
- private AMQQueueMBean _managedObject;
private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
private boolean _nolocal;
@@ -185,12 +185,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
+ private UUID _qmfId;
private ConfigurationPlugin _queueConfiguration;
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
private final MessageGroupManager _messageGroupManager;
+ private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
+ new ArrayList<SubscriptionRegistrationListener>();
+
+ private AMQQueue.NotificationListener _notificationListener;
+ private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
{
this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments);
@@ -227,17 +234,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
_name = name;
- _resourceName = String.valueOf(name);
_durable = durable;
_owner = owner;
_autoDelete = autoDelete;
_exclusive = exclusive;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _arguments = arguments;
+ _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments);
_id = id;
-
+ _qmfId = getConfigStore().createId();
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
@@ -255,16 +261,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
getConfigStore().addConfiguredObject(this);
- try
- {
- _managedObject = new AMQQueueMBean(this);
- _managedObject.register();
- }
- catch (JMException e)
- {
- _logger.error("AMQQueue MBean creation has failed ", e);
- }
-
if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
{
if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
@@ -302,7 +298,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void execute(Runnable runnable)
{
- _asyncDelivery.execute(runnable);
+ try
+ {
+ _asyncDelivery.execute(runnable);
+ }
+ catch (RejectedExecutionException ree)
+ {
+ if (_stopped.get())
+ {
+ // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+ }
+ else
+ {
+ _logger.error("Unexpected rejected execution", ree);
+ throw ree;
+ }
+ }
}
public AMQShortString getNameShortString()
@@ -320,6 +331,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _id;
}
+ @Override
+ public UUID getQMFId()
+ {
+ return _qmfId;
+ }
+
public QueueConfigType getConfigType()
{
return QueueConfigType.getInstance();
@@ -339,15 +356,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
return _exclusive;
}
-
- public void setExclusive(boolean exclusive) throws AMQException
+
+ public void setExclusive(boolean exclusive)
{
_exclusive = exclusive;
-
- if(isDurable())
- {
- getVirtualHost().getMessageStore().updateQueue(this);
- }
}
public Exchange getAlternateExchange()
@@ -368,22 +380,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_alternateExchange = exchange;
}
- public void setAlternateExchange(String exchangeName)
- {
- if(exchangeName == null || exchangeName.equals(""))
- {
- _alternateExchange = null;
- return;
- }
-
- Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName));
- if (exchange == null)
- {
- throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
- }
- setAlternateExchange(exchange);
- }
-
+ /**
+ * Arguments used to create this queue. The caller is assured
+ * that null will never be returned.
+ */
public Map<String, Object> getArguments()
{
return _arguments;
@@ -430,8 +430,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
throw new AMQSecurityException("Permission denied");
}
-
-
+
+
if (hasExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
@@ -463,15 +463,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
subscription.setNoLocal(_nolocal);
}
+
+ synchronized (_subscriptionListeners)
+ {
+ for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ {
+ listener.subscriptionRegistered(this, subscription);
+ }
+ }
+
_subscriptionList.add(subscription);
-
+
//Increment consumerCountHigh if necessary. (un)registerSubscription are both
//synchronized methods so we don't need additional synchronization here
if(_counsumerCountHigh.get() < getConsumerCount())
{
_counsumerCountHigh.incrementAndGet();
}
-
+
if (isDeleted())
{
subscription.queueDeleted(this);
@@ -507,6 +516,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
resetSubPointersForGroups(subscription, true);
}
+ synchronized (_subscriptionListeners)
+ {
+ for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ {
+ listener.subscriptionUnregistered(this, subscription);
+ }
+ }
+
// auto-delete queues must be deleted if there are no remaining subscribers
if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
@@ -526,6 +543,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
+ public Collection<Subscription> getConsumers()
+ {
+ List<Subscription> consumers = new ArrayList<Subscription>();
+ SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ while(iter.advance())
+ {
+ consumers.add(iter.getNode().getSubscription());
+ }
+ return consumers;
+
+ }
+
+ public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ {
+ synchronized (_subscriptionListeners)
+ {
+ _subscriptionListeners.add(listener);
+ }
+ }
+
+ public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ {
+ synchronized (_subscriptionListeners)
+ {
+ _subscriptionListeners.remove(listener);
+ }
+ }
+
public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
{
QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
@@ -576,10 +621,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
break;
}
}
-
+
reconfigure();
}
-
+
private void reconfigure()
{
//Reconfigure the queue for to reflect this new binding.
@@ -604,7 +649,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
-
+
reconfigure();
}
@@ -718,10 +763,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- if(_managedObject != null)
- {
- _managedObject.checkForNotification(entry.getMessage());
- }
+ checkForNotification(entry.getMessage());
if(action != null)
{
@@ -738,8 +780,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
try
{
- if (!sub.isSuspended()
- && subscriptionReadyAndHasInterest(sub, entry)
+ if (!sub.isSuspended()
+ && subscriptionReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
@@ -788,6 +830,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
long size = message.getSize();
getAtomicQueueSize().addAndGet(size);
+ _enqueueCount.incrementAndGet();
_enqueueSize.addAndGet(size);
if(message.isPersistent() && isDurable())
{
@@ -796,19 +839,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
+ public long getTotalDequeueCount()
+ {
+ return _dequeueCount.get();
+ }
+
+ public long getTotalEnqueueCount()
+ {
+ return _enqueueCount.get();
+ }
+
private void incrementQueueCount()
{
getAtomicQueueCount().incrementAndGet();
}
-
+
private void incrementTxnEnqueueStats(final ServerMessage message)
{
_msgTxnEnqueues.incrementAndGet();
_byteTxnEnqueues.addAndGet(message.getSize());
}
-
+
private void incrementTxnDequeueStats(QueueEntry entry)
- {
+ {
_msgTxnDequeues.incrementAndGet();
_byteTxnDequeues.addAndGet(entry.getSize());
}
@@ -819,7 +872,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
setLastSeenEntry(sub, entry);
_deliveredMessages.incrementAndGet();
- incrementUnackedMsgCount();
+ incrementUnackedMsgCount(entry);
sub.send(entry, batch);
}
@@ -833,12 +886,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
{
QueueContext subContext = (QueueContext) sub.getQueueContext();
- QueueEntry releasedEntry = subContext.getReleasedEntry();
-
- QueueContext._lastSeenUpdater.set(subContext, entry);
- if(releasedEntry == entry)
+ if (subContext != null)
{
- QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+ QueueEntry releasedEntry = subContext.getReleasedEntry();
+
+ QueueContext._lastSeenUpdater.set(subContext, entry);
+ if(releasedEntry == entry)
+ {
+ QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+ }
}
}
@@ -887,7 +943,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
_deliveredMessages.decrementAndGet();
}
-
+
if(sub != null && sub.isSessionTransactional())
{
incrementTxnDequeueStats(entry);
@@ -940,11 +996,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
+
+
public int getConsumerCount()
{
return _subscriptionList.size();
}
-
+
public int getConsumerCountHigh()
{
return _counsumerCountHigh.get();
@@ -1148,7 +1206,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void visit(final Visitor visitor)
+ public void visit(final QueueEntryVisitor visitor)
{
QueueEntryIterator queueListIterator = _entries.iterator();
@@ -1195,192 +1253,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void moveMessagesToAnotherQueue(final long fromMessageId,
- final long toMessageId,
- String destinationQueueName) throws IllegalArgumentException
- {
-
- final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
-
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageNumber();
- return (messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && entry.acquire();
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
-
-
- final ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- boolean shouldRollback = true;
- try
- {
- // Move the messages in on the message store.
- for (final QueueEntry entry : entries)
- {
- final ServerMessage message = entry.getMessage();
- txn.enqueue(toQueue, message,
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- try
- {
- toQueue.enqueue(message);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
- entry.release();
- }
- });
- txn.dequeue(this, message,
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- entry.discard();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
- txn.commit();
- shouldRollback = false;
- }
- finally
- {
- if (shouldRollback)
- {
- txn.rollback();
- }
- }
-
- }
-
- public void copyMessagesToAnotherQueue(final long fromMessageId,
- final long toMessageId,
- String destinationQueueName) throws IllegalArgumentException
- {
- final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
-
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageNumber();
- return ((messageId >= fromMessageId)
- && (messageId <= toMessageId));
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
-
- final ServerTransaction txn = new LocalTransaction(_virtualHost.getMessageStore());
- boolean shouldRollback = true;
- try
- {
- // Copy the messages in on the message store.
- for (QueueEntry entry : entries)
- {
- final ServerMessage message = entry.getMessage();
-
- txn.enqueue(toQueue, message, new ServerTransaction.Action()
- {
- public void postCommit()
- {
- try
- {
- toQueue.enqueue(message);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
- }
- });
-
- }
-
- txn.commit();
- shouldRollback = false;
- }
- finally
- {
- if (shouldRollback)
- {
- txn.rollback();
- }
- }
-
- }
-
- private AMQQueue getValidatedDestinationQueue(String queueName)
- {
- final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- if (toQueue == null)
- {
- throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
- }
- else if (toQueue == this)
- {
- throw new IllegalArgumentException("The destination queue can't be the same as the source queue");
- }
- return toQueue;
- }
-
- public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
- {
-
- QueueEntryIterator queueListIterator = _entries.iterator();
-
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
-
- final ServerMessage message = node.getMessage();
- if(message != null)
- {
- final long messageId = message.getMessageNumber();
-
- if ((messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && node.acquire())
- {
- dequeueEntry(node);
- }
- }
- }
-
- }
-
public void purge(final long request) throws AMQException
{
clear(request);
@@ -1393,6 +1265,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
// ------ Management functions
+ // TODO - now only used by the tests
public void deleteMessageFromTop()
{
QueueEntryIterator queueListIterator = _entries.iterator();
@@ -1411,7 +1284,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
public long clearQueue() throws AMQException
- {
+ {
return clear(0l);
}
@@ -1422,7 +1295,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
throw new AMQSecurityException("Permission denied: queue " + getName());
}
-
+
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
@@ -1489,7 +1362,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
throw new AMQSecurityException("Permission denied: " + getName());
}
-
+
if (!_deleted.getAndSet(true))
{
@@ -1617,12 +1490,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
txn.commit();
-
- if(_managedObject!=null)
- {
- _managedObject.unregister();
- }
-
for (Task task : _deleteTaskList)
{
task.doTask(this);
@@ -2101,16 +1968,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
else
{
- if (_managedObject != null)
+ // There is a chance that the node could be deleted by
+ // the time the check actually occurs. So verify we
+ // can actually get the message to perform the check.
+ ServerMessage msg = node.getMessage();
+ if (msg != null)
{
- // There is a chance that the node could be deleted by
- // the time the check actually occurs. So verify we
- // can actually get the message to perform the check.
- ServerMessage msg = node.getMessage();
- if (msg != null)
- {
- _managedObject.checkForNotification(msg);
- }
+ checkForNotification(msg);
}
}
}
@@ -2235,11 +2099,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _notificationChecks;
}
- public ManagedObject getManagedObject()
- {
- return _managedObject;
- }
-
private final class QueueEntryListener implements QueueEntry.StateChangeListener
{
@@ -2330,12 +2189,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _queueConfiguration;
}
- public String getResourceName()
- {
- return _resourceName;
- }
-
-
public ConfigStore getConfigStore()
{
return getVirtualHost().getConfigStore();
@@ -2355,22 +2208,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
return _dequeueSize.get();
}
-
+
public long getByteTxnEnqueues()
{
return _byteTxnEnqueues.get();
}
-
+
public long getByteTxnDequeues()
{
return _byteTxnDequeues.get();
}
-
+
public long getMsgTxnEnqueues()
{
return _msgTxnEnqueues.get();
}
-
+
public long getMsgTxnDequeues()
{
return _msgTxnDequeues.get();
@@ -2407,21 +2260,28 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
return _unackedMsgCountHigh.get();
}
-
+
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
}
-
- public void decrementUnackedMsgCount()
+
+ public long getUnackedMessageBytes()
+ {
+ return _unackedMsgBytes.get();
+ }
+
+ public void decrementUnackedMsgCount(QueueEntry queueEntry)
{
_unackedMsgCount.decrementAndGet();
+ _unackedMsgBytes.addAndGet(-queueEntry.getSize());
}
-
- private void incrementUnackedMsgCount()
+
+ private void incrementUnackedMsgCount(QueueEntry entry)
{
long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-
+ _unackedMsgBytes.addAndGet(entry.getSize());
+
long unackedMsgCountHigh;
while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
{
@@ -2447,4 +2307,54 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_maximumDeliveryCount = maximumDeliveryCount;
}
+ /**
+ * Checks if there is any notification to send to the listeners
+ */
+ private void checkForNotification(ServerMessage<?> msg) throws AMQException
+ {
+ final Set<NotificationCheck> notificationChecks = getNotificationChecks();
+ final AMQQueue.NotificationListener listener = _notificationListener;
+
+ if(listener != null && !notificationChecks.isEmpty())
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - getMinimumAlertRepeatGap();
+
+ for (NotificationCheck check : notificationChecks)
+ {
+ if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+ {
+ if (check.notifyIfNecessary(msg, this, listener))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
+ }
+ }
+ }
+ }
+
+ public void setNotificationListener(AMQQueue.NotificationListener listener)
+ {
+ _notificationListener = listener;
+ }
+
+ @Override
+ public void setDescription(String description)
+ {
+ if (description == null)
+ {
+ _arguments.remove(AMQQueueFactory.X_QPID_DESCRIPTION);
+ }
+ else
+ {
+ _arguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, description);
+ }
+ }
+
+ @Override
+ public String getDescription()
+ {
+ return (String) _arguments.get(AMQQueueFactory.X_QPID_DESCRIPTION);
+ }
+
}