summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java118
1 files changed, 62 insertions, 56 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 501e90b4d7..63ec56c1af 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -19,7 +19,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -73,10 +72,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
- private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
-
- private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
@@ -105,6 +100,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
/** the minimum interval between sending out consecutive alerts of the same type */
public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
+
private static final int MAX_ASYNC_DELIVERIES = 10;
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
@@ -159,7 +155,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
resetNotifications();
-
}
public void resetNotifications()
@@ -188,6 +183,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _autoDelete;
}
+ public boolean isFlowed()
+ {
+ return _entries.isFlowed();
+ }
+
public AMQShortString getOwner()
{
return _owner;
@@ -321,10 +321,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
{
-
- incrementQueueCount();
- incrementQueueSize(message);
-
_totalMessagesReceived.incrementAndGet();
QueueEntry entry;
@@ -421,7 +417,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
deliverAsync();
}
- _managedObject.checkForNotification(entry.getMessage());
+ _managedObject.checkForNotification(entry);
return entry;
}
@@ -465,20 +461,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Simple Queues don't :-)
}
- private void incrementQueueSize(final AMQMessage message)
- {
- getAtomicQueueSize().addAndGet(message.getSize());
- }
-
- private void incrementQueueCount()
- {
- getAtomicQueueCount().incrementAndGet();
- }
-
private void deliverMessage(final Subscription sub, final QueueEntry entry)
throws AMQException
{
_deliveredMessages.incrementAndGet();
+
sub.send(entry);
}
@@ -573,8 +560,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
*/
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
- decrementQueueCount();
- decrementQueueSize(entry);
if (entry.acquiredBySubscription())
{
_deliveredMessages.decrementAndGet();
@@ -582,10 +567,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
- AMQMessage msg = entry.getMessage();
- if (msg.isPersistent())
+ if (entry.isPersistent())
{
- _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
+ _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, entry.getMessageId());
}
}
@@ -604,15 +588,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- private void decrementQueueSize(final QueueEntry entry)
- {
- getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
- }
-
- void decrementQueueCount()
- {
- getAtomicQueueCount().decrementAndGet();
- }
public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
{
@@ -658,14 +633,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return getMessageCount() == 0;
}
+ public long getMemoryUsageCurrent()
+ {
+ return getQueueInMemory();
+ }
+
public int getMessageCount()
{
- return getAtomicQueueCount().get();
+ return getQueueCount();
}
public long getQueueDepth()
{
- return getAtomicQueueSize().get();
+ return getQueueSize();
}
public int getUndeliveredMessageCount()
@@ -741,14 +721,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _name.compareTo(o.getName());
}
- public AtomicInteger getAtomicQueueCount()
+ public int getQueueCount()
+ {
+ return _entries.size();
+ }
+
+ public long getQueueSize()
{
- return _atomicQueueCount;
+ return _entries.dataSize();
}
- public AtomicLong getAtomicQueueSize()
+ public long getQueueInMemory()
{
- return _atomicQueueSize;
+ return _entries.memoryUsed();
}
private boolean isExclusiveSubscriber()
@@ -775,7 +760,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessageId();
return messageId >= fromMessageId && messageId <= toMessageId;
}
@@ -794,7 +779,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- _complete = entry.getMessage().getMessageId() == messageId;
+ _complete = entry.getMessageId() == messageId;
return _complete;
}
@@ -843,7 +828,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessageId();
return (messageId >= fromMessageId)
&& (messageId <= toMessageId)
&& entry.acquire();
@@ -862,11 +847,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in the transaction log.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
-
- if (message.isPersistent() && toQueue.isDurable())
+ if (entry.isPersistent() && toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
}
// dequeue will remove the messages from the queue
entry.dequeue(storeContext);
@@ -901,6 +884,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
for (QueueEntry entry : entries)
{
toQueue.enqueue(storeContext, entry.getMessage());
+ // As we only did a dequeue above now that we have moved the message we should perform a delete.
+ // We cannot do this earlier as the message will be lost if flowed.
+ //entry.delete();
}
}
catch (MessageCleanupException e)
@@ -927,7 +913,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessageId();
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId))
{
@@ -953,11 +939,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in on the transaction log.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
-
- if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable())
+ if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
}
}
@@ -1016,7 +1000,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
QueueEntry node = queueListIterator.getNode();
- final long messageId = node.getMessage().getMessageId();
+ final long messageId = node.getMessageId();
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
@@ -1116,6 +1100,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (!_stopped.getAndSet(true))
{
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ _entries.stop();
}
}
@@ -1451,12 +1436,33 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
else
{
- _managedObject.checkForNotification(node.getMessage());
+ _managedObject.checkForNotification(node);
}
}
}
+
+ public long getMemoryUsageMaximum()
+ {
+ return _entries.getMemoryUsageMaximum();
+ }
+
+ public void setMemoryUsageMaximum(long maximumMemoryUsage)
+ {
+ _entries.setMemoryUsageMaximum(maximumMemoryUsage);
+ }
+
+ public long getMemoryUsageMinimum()
+ {
+ return _entries.getMemoryUsageMinimum();
+ }
+
+ public void setMemoryUsageMinimum(long minimumMemoryUsage)
+ {
+ _entries.setMemoryUsageMinimum(minimumMemoryUsage);
+ }
+
public long getMinimumAlertRepeatGap()
{
return _minimumAlertRepeatGap;
@@ -1597,7 +1603,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
for (int i = 0; i < num && !it.atTail(); i++)
{
it.advance();
- ids.add(it.getNode().getMessage().getMessageId());
+ ids.add(it.getNode().getMessageId());
}
return ids;
}