diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 25 |
1 files changed, 21 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index f09e8213b1..9efeb8351c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -34,6 +34,7 @@ import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** @@ -76,7 +77,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. */ private ReentrantLock _lock = new ReentrantLock(); - + private AtomicLong _totalMessageSize = new AtomicLong(); ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -109,6 +110,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _messages.offer(msg); + _totalMessageSize.addAndGet(msg.getSize()); + return true; } @@ -142,6 +145,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return _messages.size(); } + public long getTotalMessageSize() + { + return _totalMessageSize.get(); + } + + public long getOldestMessageArrival() + { + AMQMessage msg = _messages.peek(); + return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); + } + public synchronized List<AMQMessage> getMessages() { @@ -173,6 +187,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (msg != null) { msg.dequeue(_queue); + _totalMessageSize.addAndGet(-msg.getSize()); } } @@ -182,6 +197,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (msg != null) { msg.dequeue(_queue); + _totalMessageSize.set(0L); msg = poll(); } } @@ -222,6 +238,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //remove sent message from our queue. messageQueue.poll(); + _totalMessageSize.addAndGet(-message.getSize()); } catch (FailedDequeueException e) { @@ -308,7 +325,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Pre Deliver to all subscriptions if (_log.isDebugEnabled()) { - _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + + _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); } for (Subscription sub : _subscriptions.getSubscriptions()) @@ -330,7 +347,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + + _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } sub.enqueueForPreDelivery(msg); @@ -345,7 +362,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); } //Deliver the message |