summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java25
1 files changed, 21 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index f09e8213b1..9efeb8351c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -34,6 +34,7 @@ import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
@@ -76,7 +77,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
* Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
*/
private ReentrantLock _lock = new ReentrantLock();
-
+ private AtomicLong _totalMessageSize = new AtomicLong();
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -109,6 +110,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_messages.offer(msg);
+ _totalMessageSize.addAndGet(msg.getSize());
+
return true;
}
@@ -142,6 +145,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return _messages.size();
}
+ public long getTotalMessageSize()
+ {
+ return _totalMessageSize.get();
+ }
+
+ public long getOldestMessageArrival()
+ {
+ AMQMessage msg = _messages.peek();
+ return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+ }
+
public synchronized List<AMQMessage> getMessages()
{
@@ -173,6 +187,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (msg != null)
{
msg.dequeue(_queue);
+ _totalMessageSize.addAndGet(-msg.getSize());
}
}
@@ -182,6 +197,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (msg != null)
{
msg.dequeue(_queue);
+ _totalMessageSize.set(0L);
msg = poll();
}
}
@@ -222,6 +238,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//remove sent message from our queue.
messageQueue.poll();
+ _totalMessageSize.addAndGet(-message.getSize());
}
catch (FailedDequeueException e)
{
@@ -308,7 +325,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//Pre Deliver to all subscriptions
if (_log.isDebugEnabled())
{
- _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
" subscribers to give the message to.");
}
for (Subscription sub : _subscriptions.getSubscriptions())
@@ -330,7 +347,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
sub.enqueueForPreDelivery(msg);
@@ -345,7 +362,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
System.identityHashCode(s) + ") :" + s);
}
//Deliver the message