summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java106
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java278
4 files changed, 337 insertions, 76 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 2a692344d0..184504717e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -160,6 +160,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void setMinimumAlertRepeatGap(long value);
+ long getCapacity();
+
+ void setCapacity(long capacity);
+
+
+ long getFlowResumeCapacity();
+
+ void setFlowResumeCapacity(long flowResumeCapacity);
+
+
+
void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
long clearQueue(StoreContext storeContext) throws AMQException;
@@ -180,6 +191,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void stop();
+ void checkCapacity(AMQChannel channel);
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 7509350e65..267ccf43ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -26,11 +26,105 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+import java.util.HashMap;
+
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ private abstract static class QueueProperty
+ {
+
+ private final AMQShortString _argumentName;
+
+
+ public QueueProperty(String argumentName)
+ {
+ _argumentName = new AMQShortString(argumentName);
+ }
+
+ public AMQShortString getArgumentName()
+ {
+ return _argumentName;
+ }
+
+
+ public abstract void setPropertyValue(AMQQueue queue, Object value);
+
+ }
+
+ private abstract static class QueueLongProperty extends QueueProperty
+ {
+
+ public QueueLongProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).longValue());
+ }
+
+ }
+
+ abstract void setPropertyValue(AMQQueue queue, long value);
+
+
+ }
+
+ private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
+ new QueueLongProperty("x-qpid-maximum-message-age")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageAge(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-size")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageSize(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-count")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageCount(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMinimumAlertRepeatGap(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setCapacity(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-flow-resume-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setFlowResumeCapacity(value);
+ }
+ }
+
+ };
+
+
+
public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
@@ -53,6 +147,18 @@ public class AMQQueueFactory
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+
+ if(arguments != null)
+ {
+ for(QueueProperty p : DECLAREABLE_PROPERTIES)
+ {
+ if(arguments.containsKey(p.getArgumentName()))
+ {
+ p.setPropertyValue(q, arguments.get(p.getArgumentName()));
+ }
+ }
+ }
+
return q;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index dbad5438dc..3b58f05f93 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -42,8 +42,7 @@ public class QueueEntryImpl implements QueueEntry
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
-
+ private final AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
@@ -177,13 +176,21 @@ public class QueueEntryImpl implements QueueEntry
public String debugIdentity()
{
- return getMessage().debugIdentity();
+ AMQMessage message = getMessage();
+ if (message == null)
+ {
+ return "null";
+ }
+ else
+ {
+ return message.debugIdentity();
+ }
}
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return getMessage().immediateAndNotDelivered();
}
public void setRedelivered(boolean b)
@@ -385,4 +392,5 @@ public class QueueEntryImpl implements QueueEntry
{
return _queueEntryList;
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b14b92b014..d781dc4dea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1,11 +1,10 @@
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +34,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.AMQChannel;
/*
*
@@ -96,6 +96,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final Executor _asyncDelivery;
private final AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
/** max allowed size(KB) of a single message */
public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -122,6 +124,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private LogSubject _logSubject;
private LogActor _logActor;
+
+ private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+ private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+ private final AtomicBoolean _overfull = new AtomicBoolean(false);
+
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -508,8 +515,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
_deliveredMessages.incrementAndGet();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity());
+ }
sub.send(entry);
-
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
@@ -626,6 +636,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new FailedDequeueException(_name.toString(), e);
}
+ checkCapacity();
+
}
private void decrementQueueSize(final QueueEntry entry)
@@ -1170,11 +1182,64 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- public void deliverAsync()
+ public void checkCapacity(AMQChannel channel)
{
- _stateChangeCount.incrementAndGet();
+ if(_capacity != 0l)
+ {
+ if(_atomicQueueSize.get() > _capacity)
+ {
+ _overfull.set(true);
+ //Overfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
+
+ if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
+ {
+ channel.block(this);
+ }
+
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+ channel.unblock(this);
+ _blockedChannels.remove(channel);
+
+ }
+
+ }
+
+
+
+ }
+ }
+
+ private void checkCapacity()
+ {
+ if(_capacity != 0L)
+ {
+ if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+ if(_overfull.compareAndSet(true,false))
+ {//Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+ }
- Runner runner = new Runner();
+
+ for(AMQChannel c : _blockedChannels.keySet())
+ {
+ c.unblock(this);
+ _blockedChannels.remove(c);
+ }
+ }
+ }
+ }
+
+
+ public void deliverAsync()
+ {
+ Runner runner = new Runner(_stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1187,13 +1252,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(new SubFlushRunner(sub));
}
+
private class Runner implements ReadWriteRunnable
{
+ String _name;
+ public Runner(long count)
+ {
+ _name = "QueueRunner-" + count + "-" + _logActor;
+ }
+
public void run()
{
+ String originalName = Thread.currentThread().getName();
try
{
+ Thread.currentThread().setName(_name);
CurrentActor.set(_logActor);
+
processQueue(this);
}
catch (AMQException e)
@@ -1203,9 +1278,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
finally
{
CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
}
-
-
}
public boolean isRead()
@@ -1217,6 +1291,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return true;
}
+
+ public String toString()
+ {
+ return _name;
+ }
}
private class SubFlushRunner implements ReadWriteRunnable
@@ -1230,27 +1309,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void run()
{
- boolean complete = false;
- try
- {
- CurrentActor.set(_sub.getLogActor());
- complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
- }
- catch (AMQException e)
- {
- _logger.error(e);
+ String originalName = Thread.currentThread().getName();
+ try{
+ Thread.currentThread().setName("SubFlushRunner-"+_sub);
+
+ boolean complete = false;
+ try
+ {
+ CurrentActor.set(_sub.getLogActor());
+ complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ if (!complete && !_sub.isSuspended())
+ {
+ _asyncDelivery.execute(this);
+ }
}
finally
{
- CurrentActor.remove();
- }
- if (!complete && !_sub.isSuspended())
- {
- _asyncDelivery.execute(this);
+ Thread.currentThread().setName(originalName);
}
-
}
public boolean isRead()
@@ -1278,7 +1366,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
sub.getSendLock();
- atTail = attemptDelivery(sub);
+ atTail = attemptDelivery(sub);
if (atTail && sub.isAutoClose())
{
unregisterSubscription(sub);
@@ -1308,63 +1396,81 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return atTail;
}
+ /**
+ * Attempt delivery for the given subscription.
+ *
+ * Looks up the next node for the subscription and attempts to deliver it.
+ *
+ * @param sub
+ * @return true if we have completed all possible deliveries for this sub.
+ * @throws AMQException
+ */
private boolean attemptDelivery(Subscription sub) throws AMQException
{
boolean atTail = false;
boolean advanced = false;
- boolean subActive = sub.isActive();
+ boolean subActive = sub.isActive() && !sub.isSuspended();
if (subActive)
{
QueueEntry node = moveSubscriptionToNextNode(sub);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity());
+ }
if (!(node.isAcquired() || node.isDeleted()))
{
- if (!sub.isSuspended())
+ if (sub.hasInterest(node))
{
- if (sub.hasInterest(node))
+ if (!sub.wouldSuspend(node))
{
- if (!sub.wouldSuspend(node))
+ if (!sub.isBrowser() && !node.acquire(sub))
{
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
- }
- else
+ sub.restoreCredit(node);
+ }
+ else
+ {
+ deliverMessage(sub, node);
+
+ if (sub.isBrowser())
{
- deliverMessage(sub, node);
+ QueueEntry newNode = _entries.next(node);
- if (sub.isBrowser())
+ if (newNode != null)
{
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
+ advanced = true;
+ sub.setLastSeenEntry(node, newNode);
+ node = sub.getLastSeenEntry();
}
+
}
-
- }
- else // Not enough Credit for message and wouldSuspend
- {
- //QPID-1187 - Treat the subscription as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
- node.addStateChangeListener(new QueueEntryListener(sub, node));
}
+
}
- else
+ else // Not enough Credit for message and wouldSuspend
{
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
+ //QPID-1187 - Treat the subscription as suspended for this message
+ // and wait for the message to be removed to continue delivery.
+
+ // 2009-09-30 : MR : setting subActive = false only causes, this
+ // particular delivery attempt to end. This is called from
+ // flushSubscription and processQueue both of which attempt
+ // delivery a number of times. Won't a bytes limited
+ // subscriber with not enough credit for the next message
+ // create a lot of new QELs? How about a browser that calls
+ // this method LONG.MAX_LONG times!
+ subActive = false;
+ node.addStateChangeListener(new QueueEntryListener(sub, node));
+ }
+ }
+ else
+ {
+ // this subscription is not interested in this node so we can skip over it
+ QueueEntry newNode = _entries.next(node);
+ if (newNode != null)
+ {
+ sub.setLastSeenEntry(node, newNode);
}
}
-
}
atTail = (_entries.next(node) == null) && !advanced;
}
@@ -1409,6 +1515,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity()));
+ }
+
return node;
}
@@ -1423,6 +1535,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asynchronousRunner.compareAndSet(runner, null);
+ // For every message enqueue/requeue the we fire deliveryAsync() which
+ // increases _stateChangeCount. If _sCC changes whilst we are in our loop
+ // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
+ // then we will continue to run for a maximum of iterations.
+ // So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
{
// we want to have one extra loop after every subscription has reached the point where it cannot move
@@ -1442,20 +1559,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//iterate over the subscribers and try to advance their pointer
while (subscriptionIter.advance())
{
- boolean closeConsumer = false;
Subscription sub = subscriptionIter.getNode().getSubscription();
sub.getSendLock();
try
{
- if (sub != null)
- {
-
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null)
- {
- done = attemptDelivery(sub);
- }
- }
+ done = attemptDelivery(sub);
if (done)
{
if (extraLoops == 0)
@@ -1492,11 +1600,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rescheduling runner:" + runner);
+ }
_asyncDelivery.execute(runner);
}
}
- @Override
public void checkMessageStatus() throws AMQException
{
@@ -1604,6 +1715,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ public long getCapacity()
+ {
+ return _capacity;
+ }
+
+ public void setCapacity(long capacity)
+ {
+ _capacity = capacity;
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _flowResumeCapacity;
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ _flowResumeCapacity = flowResumeCapacity;
+ }
+
+
public Set<NotificationCheck> getNotificationChecks()
{
return _notificationChecks;
@@ -1673,6 +1805,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setMaximumMessageSize(config.getMaximumMessageSize());
setMaximumMessageCount(config.getMaximumMessageCount());
setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ _capacity = config.getCapacity();
+ _flowResumeCapacity = config.getFlowResumeCapacity();
}
}
}