diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue')
4 files changed, 337 insertions, 76 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 2a692344d0..184504717e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -160,6 +160,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void setMinimumAlertRepeatGap(long value); + long getCapacity(); + + void setCapacity(long capacity); + + + long getFlowResumeCapacity(); + + void setFlowResumeCapacity(long flowResumeCapacity); + + + void deleteMessageFromTop(StoreContext storeContext) throws AMQException; long clearQueue(StoreContext storeContext) throws AMQException; @@ -180,6 +191,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void stop(); + void checkCapacity(AMQChannel channel); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription * already exists. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 7509350e65..267ccf43ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -26,11 +26,105 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Map; +import java.util.HashMap; + public class AMQQueueFactory { public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); + private abstract static class QueueProperty + { + + private final AMQShortString _argumentName; + + + public QueueProperty(String argumentName) + { + _argumentName = new AMQShortString(argumentName); + } + + public AMQShortString getArgumentName() + { + return _argumentName; + } + + + public abstract void setPropertyValue(AMQQueue queue, Object value); + + } + + private abstract static class QueueLongProperty extends QueueProperty + { + + public QueueLongProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).longValue()); + } + + } + + abstract void setPropertyValue(AMQQueue queue, long value); + + + } + + private static final QueueProperty[] DECLAREABLE_PROPERTIES = { + new QueueLongProperty("x-qpid-maximum-message-age") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageAge(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-size") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageSize(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-count") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageCount(value); + } + }, + new QueueLongProperty("x-qpid-minimum-alert-repeat-gap") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMinimumAlertRepeatGap(value); + } + }, + new QueueLongProperty("x-qpid-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setCapacity(value); + } + }, + new QueueLongProperty("x-qpid-flow-resume-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setFlowResumeCapacity(value); + } + } + + }; + + + public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, @@ -53,6 +147,18 @@ public class AMQQueueFactory //Register the new queue virtualHost.getQueueRegistry().registerQueue(q); q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString())); + + if(arguments != null) + { + for(QueueProperty p : DECLAREABLE_PROPERTIES) + { + if(arguments.containsKey(p.getArgumentName())) + { + p.setPropertyValue(q, arguments.get(p.getArgumentName())); + } + } + } + return q; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index dbad5438dc..3b58f05f93 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -42,8 +42,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; - + private final AMQMessage _message; private Set<Subscription> _rejectedBy = null; @@ -177,13 +176,21 @@ public class QueueEntryImpl implements QueueEntry public String debugIdentity() { - return getMessage().debugIdentity(); + AMQMessage message = getMessage(); + if (message == null) + { + return "null"; + } + else + { + return message.debugIdentity(); + } } public boolean immediateAndNotDelivered() { - return _message.immediateAndNotDelivered(); + return getMessage().immediateAndNotDelivered(); } public void setRedelivered(boolean b) @@ -385,4 +392,5 @@ public class QueueEntryImpl implements QueueEntry { return _queueEntryList; } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b14b92b014..d781dc4dea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1,11 +1,10 @@ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -35,6 +34,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.QueueMessages; +import org.apache.qpid.server.AMQChannel; /* * @@ -96,6 +96,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final Executor _asyncDelivery; private final AtomicLong _totalMessagesReceived = new AtomicLong(); + private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); + /** max allowed size(KB) of a single message */ public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); @@ -122,6 +124,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private LogSubject _logSubject; private LogActor _logActor; + + private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); + private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); + private final AtomicBoolean _overfull = new AtomicBoolean(false); + protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { @@ -508,8 +515,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { _deliveredMessages.incrementAndGet(); + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity()); + } sub.send(entry); - } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) @@ -626,6 +636,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new FailedDequeueException(_name.toString(), e); } + checkCapacity(); + } private void decrementQueueSize(final QueueEntry entry) @@ -1170,11 +1182,64 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public void deliverAsync() + public void checkCapacity(AMQChannel channel) { - _stateChangeCount.incrementAndGet(); + if(_capacity != 0l) + { + if(_atomicQueueSize.get() > _capacity) + { + _overfull.set(true); + //Overfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity)); + + if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) + { + channel.block(this); + } + + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + + //Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + + channel.unblock(this); + _blockedChannels.remove(channel); + + } + + } + + + + } + } + + private void checkCapacity() + { + if(_capacity != 0L) + { + if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) + { + if(_overfull.compareAndSet(true,false)) + {//Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + } - Runner runner = new Runner(); + + for(AMQChannel c : _blockedChannels.keySet()) + { + c.unblock(this); + _blockedChannels.remove(c); + } + } + } + } + + + public void deliverAsync() + { + Runner runner = new Runner(_stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1187,13 +1252,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(new SubFlushRunner(sub)); } + private class Runner implements ReadWriteRunnable { + String _name; + public Runner(long count) + { + _name = "QueueRunner-" + count + "-" + _logActor; + } + public void run() { + String originalName = Thread.currentThread().getName(); try { + Thread.currentThread().setName(_name); CurrentActor.set(_logActor); + processQueue(this); } catch (AMQException e) @@ -1203,9 +1278,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener finally { CurrentActor.remove(); + Thread.currentThread().setName(originalName); } - - } public boolean isRead() @@ -1217,6 +1291,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return true; } + + public String toString() + { + return _name; + } } private class SubFlushRunner implements ReadWriteRunnable @@ -1230,27 +1309,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void run() { - boolean complete = false; - try - { - CurrentActor.set(_sub.getLogActor()); - complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); - } - catch (AMQException e) - { - _logger.error(e); + String originalName = Thread.currentThread().getName(); + try{ + Thread.currentThread().setName("SubFlushRunner-"+_sub); + + boolean complete = false; + try + { + CurrentActor.set(_sub.getLogActor()); + complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); + + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + } + if (!complete && !_sub.isSuspended()) + { + _asyncDelivery.execute(this); + } } finally { - CurrentActor.remove(); - } - if (!complete && !_sub.isSuspended()) - { - _asyncDelivery.execute(this); + Thread.currentThread().setName(originalName); } - } public boolean isRead() @@ -1278,7 +1366,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { sub.getSendLock(); - atTail = attemptDelivery(sub); + atTail = attemptDelivery(sub); if (atTail && sub.isAutoClose()) { unregisterSubscription(sub); @@ -1308,63 +1396,81 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return atTail; } + /** + * Attempt delivery for the given subscription. + * + * Looks up the next node for the subscription and attempts to deliver it. + * + * @param sub + * @return true if we have completed all possible deliveries for this sub. + * @throws AMQException + */ private boolean attemptDelivery(Subscription sub) throws AMQException { boolean atTail = false; boolean advanced = false; - boolean subActive = sub.isActive(); + boolean subActive = sub.isActive() && !sub.isSuspended(); if (subActive) { QueueEntry node = moveSubscriptionToNextNode(sub); + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity()); + } if (!(node.isAcquired() || node.isDeleted())) { - if (!sub.isSuspended()) + if (sub.hasInterest(node)) { - if (sub.hasInterest(node)) + if (!sub.wouldSuspend(node)) { - if (!sub.wouldSuspend(node)) + if (!sub.isBrowser() && !node.acquire(sub)) { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - } - else + sub.restoreCredit(node); + } + else + { + deliverMessage(sub, node); + + if (sub.isBrowser()) { - deliverMessage(sub, node); + QueueEntry newNode = _entries.next(node); - if (sub.isBrowser()) + if (newNode != null) { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - advanced = true; - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } + advanced = true; + sub.setLastSeenEntry(node, newNode); + node = sub.getLastSeenEntry(); } + } - - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - node.addStateChangeListener(new QueueEntryListener(sub, node)); } + } - else + else // Not enough Credit for message and wouldSuspend { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } + //QPID-1187 - Treat the subscription as suspended for this message + // and wait for the message to be removed to continue delivery. + + // 2009-09-30 : MR : setting subActive = false only causes, this + // particular delivery attempt to end. This is called from + // flushSubscription and processQueue both of which attempt + // delivery a number of times. Won't a bytes limited + // subscriber with not enough credit for the next message + // create a lot of new QELs? How about a browser that calls + // this method LONG.MAX_LONG times! + subActive = false; + node.addStateChangeListener(new QueueEntryListener(sub, node)); + } + } + else + { + // this subscription is not interested in this node so we can skip over it + QueueEntry newNode = _entries.next(node); + if (newNode != null) + { + sub.setLastSeenEntry(node, newNode); } } - } atTail = (_entries.next(node) == null) && !advanced; } @@ -1409,6 +1515,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity())); + } + return node; } @@ -1423,6 +1535,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asynchronousRunner.compareAndSet(runner, null); + // For every message enqueue/requeue the we fire deliveryAsync() which + // increases _stateChangeCount. If _sCC changes whilst we are in our loop + // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) + // then we will continue to run for a maximum of iterations. + // So whilst delivery/rejection is going on a processQueue thread will be running while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) { // we want to have one extra loop after every subscription has reached the point where it cannot move @@ -1442,20 +1559,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //iterate over the subscribers and try to advance their pointer while (subscriptionIter.advance()) { - boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); sub.getSendLock(); try { - if (sub != null) - { - - QueueEntry node = moveSubscriptionToNextNode(sub); - if (node != null) - { - done = attemptDelivery(sub); - } - } + done = attemptDelivery(sub); if (done) { if (extraLoops == 0) @@ -1492,11 +1600,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rescheduling runner:" + runner); + } _asyncDelivery.execute(runner); } } - @Override public void checkMessageStatus() throws AMQException { @@ -1604,6 +1715,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public long getCapacity() + { + return _capacity; + } + + public void setCapacity(long capacity) + { + _capacity = capacity; + } + + public long getFlowResumeCapacity() + { + return _flowResumeCapacity; + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + _flowResumeCapacity = flowResumeCapacity; + } + + public Set<NotificationCheck> getNotificationChecks() { return _notificationChecks; @@ -1673,6 +1805,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setMaximumMessageSize(config.getMaximumMessageSize()); setMaximumMessageCount(config.getMaximumMessageCount()); setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + _capacity = config.getCapacity(); + _flowResumeCapacity = config.getFlowResumeCapacity(); } } } |