summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java183
1 files changed, 110 insertions, 73 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 7dfcae95c3..becd57752e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
@@ -47,7 +48,7 @@ import java.util.concurrent.locks.ReentrantLock;
/** Manages delivery of messages on behalf of a queue */
-public class ConcurrentSelectorDeliveryManager implements DeliveryManager
+public class ConcurrentSelectorDeliveryManager
{
private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
@@ -60,13 +61,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/** Ensures that only one asynchronous task is running for this manager at any time. */
private final AtomicBoolean _processing = new AtomicBoolean();
/** The subscriptions on the queue to whom messages are delivered */
- private final SubscriptionManager _subscriptions;
+ private final SubscriptionSet _subscriptions = new SubscriptionSet();
/**
* A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles
* acknowledgements a handle on the queue.
*/
- private final AMQQueue _queue;
+ private final AMQQueueImpl _queue;
/**
* Flag used while moving messages from this queue to another. For moving messages the async delivery should also
@@ -83,7 +84,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private ReentrantLock _lock = new ReentrantLock();
private AtomicLong _totalMessageSize = new AtomicLong();
private AtomicInteger _extraMessages = new AtomicInteger();
- private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
+ private Set<DeliveryAgent> _hasContent = Collections.synchronizedSet(new HashSet<DeliveryAgent>());
private final Object _queueHeadLock = new Object();
private String _processingThreadName = "";
@@ -91,7 +92,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/** Used by any reaping thread to purge messages */
private StoreContext _reapingStoreContext = new StoreContext();
- ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+ ConcurrentSelectorDeliveryManager(AMQQueueImpl queue)
{
//Set values from configuration
@@ -102,11 +103,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.warn("Compressing Buffers on queue.");
}
- _subscriptions = subscriptions;
_queue = queue;
}
+ public SubscriptionSet getSubscribers()
+ {
+ return _subscriptions;
+ }
+
+
private boolean addMessageToQueue(QueueEntry entry, boolean deliverFirst)
{
AMQMessage msg = entry.getMessage();
@@ -182,13 +188,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry entry)
{
+ subscriberHasPendingResend(hasContent,_subscriptions.getDeliveryAgent(subscription),entry);
+ }
+ private void subscriberHasPendingResend(boolean hasContent, DeliveryAgent deliveryAgent, QueueEntry entry)
+ {
_lock.lock();
try
{
if (hasContent)
{
_log.debug("Queue has adding subscriber content");
- _hasContent.add(subscription);
+ _hasContent.add(deliveryAgent);
_totalMessageSize.addAndGet(entry.getSize());
_extraMessages.addAndGet(1);
}
@@ -197,7 +207,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.debug("Queue has removing subscriber content");
if (entry == null)
{
- _hasContent.remove(subscription);
+ _hasContent.remove(deliveryAgent);
}
else
{
@@ -302,20 +312,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void populatePreDeliveryQueue(Subscription subscription)
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
- }
+ populatePreDeliveryQueue(_subscriptions.getDeliveryAgent(subscription));
+ }
+ private void populatePreDeliveryQueue(DeliveryAgent deliveryAgent)
+ {
+
Iterator<QueueEntry> currentQueue = _messages.iterator();
while (currentQueue.hasNext())
{
QueueEntry entry = currentQueue.next();
- if (subscription.hasInterest(entry))
+ if (deliveryAgent != null && deliveryAgent.hasInterest(entry))
{
- subscription.enqueueForPreDelivery(entry, false);
+ deliveryAgent.enqueueForPreDelivery(entry, false);
}
}
@@ -348,7 +359,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_log.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
}
- _queue.dequeue(channel.getStoreContext(), entry);
+ entry.dequeue(channel.getStoreContext());
}
synchronized (channel)
{
@@ -367,12 +378,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (!acks)
{
- entry.getMessage().decrementReference(channel.getStoreContext());
+ entry.dispose(channel.getStoreContext());
}
}
finally
{
- entry.setDeliveredToConsumer();
+ entry.setDeliveredToSubscription();
}
return true;
@@ -414,9 +425,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended() && sub.filtersMessages())
+ DeliveryAgent deliveryAgent = _subscriptions.getDeliveryAgent(sub);
+ if (deliveryAgent != null && !sub.isSuspended() && sub.filtersMessages())
{
- Queue<QueueEntry> preDeliveryQueue = sub.getPreDeliveryQueue();
+ Queue<QueueEntry> preDeliveryQueue = deliveryAgent.getPreDeliveryQueue();
for (QueueEntry entry : messageList)
{
preDeliveryQueue.remove(entry);
@@ -449,12 +461,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (entry != null)
{
- queue.dequeue(storeContext, entry);
+ entry.dequeue(storeContext);
_totalMessageSize.addAndGet(-entry.getSize());
//If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
- entry.getMessage().decrementReference(storeContext);
+ entry.dispose(storeContext);
}
@@ -474,9 +486,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//and remove it
_messages.poll();
- _queue.dequeue(storeContext, entry);
+ entry.dequeue(storeContext);
- entry.getMessage().decrementReference(_reapingStoreContext);
+ entry.dispose(storeContext);
entry = getNextMessage();
count++;
@@ -506,7 +518,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
while (purgeMessage(entry, sub, purgeOnly))
{
- AMQMessage message = entry.getMessage();
//remove the already taken message or expired
QueueEntry removed = messages.poll();
@@ -514,14 +525,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
assert removed == entry;
// if the message expired then the _totalMessageSize needs adjusting
- if (message.expired(_queue) && !entry.taken(sub))
+ if (entry.expired() && !entry.acquire(sub))
{
_totalMessageSize.addAndGet(-entry.getSize());
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- _queue.dequeue(_reapingStoreContext, entry);
-
- message.decrementReference(_reapingStoreContext);
+ entry.dequeue(_reapingStoreContext);
+ entry.dispose(_reapingStoreContext);
if (_log.isInfoEnabled())
{
@@ -534,7 +544,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
- _log.debug("Removed taken message:" + message.debugIdentity());
+ _log.debug("Removed taken message:" + entry.debugIdentity());
}
// try the next message
@@ -546,24 +556,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* This method will return true if the message is to be purged from the queue.
- *
- *
- * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
- *
- * @param message
- * @param sub
- *
- * @return
- *
- * @throws AMQException
- */
- private boolean purgeMessage(QueueEntry message, Subscription sub) throws AMQException
- {
- return purgeMessage(message, sub, false);
- }
-
- /**
- * This method will return true if the message is to be purged from the queue.
* \
* SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the current Queue(_queue) when purgeOnly is false
*
@@ -599,13 +591,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (sub != null)
{
// if we have a queue browser(we don't purge) so check mark the message as taken
- purge = ((!sub.isBrowser() || message.isTaken()));
+ purge = ((!sub.isBrowser() || message.isAcquired()));
}
else
{
// if there is no subscription we are doing
// a get or purging so mark message as taken.
- message.isTaken();
+ message.isAcquired();
// and then ensure that it gets purged
purge = true;
}
@@ -615,17 +607,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
// If we are simply purging the queue don't take the message
// just purge up to the next non-taken msg.
- return purge && message.isTaken();
+ return purge && message.isAcquired();
}
else
{
// if we are purging then ensure we mark this message taken for the current subscriber
// the current subscriber may be null in the case of a get or a purge but this is ok.
- return purge && message.taken(sub);
+ return purge && message.acquire(sub);
}
}
- public void sendNextMessage(Subscription sub, AMQQueue queue)
+ public void sendNextMessage(DeliveryAgent sub)
{
Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
@@ -634,7 +626,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
") from queue (" + System.identityHashCode(messageQueue) +
- ") AMQQueue (" + System.identityHashCode(queue) + ")");
+ ") AMQQueue (" + System.identityHashCode(_queue) + ")");
}
if (messageQueue == null)
@@ -642,18 +634,18 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
if (_log.isInfoEnabled())
{
- _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + queue);
+ _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + _queue);
}
return;
}
QueueEntry entry = null;
- QueueEntry removed = null;
+ QueueEntry removed;
try
{
synchronized (_queueHeadLock)
{
- entry = getNextMessage(messageQueue, sub, false);
+ entry = getNextMessage(messageQueue, sub.getSubscription(), false);
// message will be null if we have no messages in the messageQueue.
if (entry == null)
@@ -677,7 +669,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_totalMessageSize.addAndGet(-entry.getSize());
}
- sub.send(entry, _queue);
+ sub.send(entry);
//remove sent message from our queue.
removed = messageQueue.poll();
@@ -739,11 +731,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- private void cleanMainQueue(Subscription sub)
+ private void cleanMainQueue(DeliveryAgent sub)
{
try
{
- getNextMessage(_messages, sub, true);
+ getNextMessage(_messages, sub.getSubscription(), true);
}
catch (AMQException e)
{
@@ -773,7 +765,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Only give the message to those that want them.
if (sub.hasInterest(entry))
{
- sub.enqueueForPreDelivery(entry, true);
+ DeliveryAgent deliveryAgent = _subscriptions.getDeliveryAgent(sub);
+ if(deliveryAgent != null)
+ {
+ deliveryAgent.enqueueForPreDelivery(entry, true);
+ }
}
}
}
@@ -806,13 +802,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
for (Subscription sub : _subscriptions.getSubscriptions())
{
- synchronized (sub.getSendLock())
+ DeliveryAgent da = _subscriptions.getDeliveryAgent(sub);
+ if(da != null)
{
- if (!sub.isSuspended())
+ synchronized (da.getSendLock())
{
- sendNextMessage(sub, _queue);
+ if (!sub.isSuspended())
+ {
+ sendNextMessage(da);
- hasSubscribers = true;
+ hasSubscribers = true;
+ }
}
}
}
@@ -839,6 +839,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
try
{
Subscription s = _subscriptions.nextSubscriber(entry);
+ DeliveryAgent da = (s==null) ? null : _subscriptions.getDeliveryAgent(s);
+
if (s == null || (!s.filtersMessages() && hasQueuedMessages())) //no-one can take the message right now or we're queueing
{
@@ -861,16 +863,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
for (Subscription sub : _subscriptions.getSubscriptions())
{
-
+ DeliveryAgent deliveryAgent = (s==null) ? null : _subscriptions.getDeliveryAgent(sub);
// Only give the message to those that want them.
- if (sub.hasInterest(entry))
+ if (deliveryAgent != null && sub.hasInterest(entry))
{
if (debugEnabled)
{
_log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(entry) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
- sub.enqueueForPreDelivery(entry, deliverFirst);
+ deliveryAgent.enqueueForPreDelivery(entry, deliverFirst);
}
}
@@ -887,9 +889,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (s.filtersMessages())
{
- if (s.getPreDeliveryQueue().size() > 0)
+ if (da.getPreDeliveryQueue().size() > 0)
{
- _log.error("Direct delivery from PDQ with queued msgs:" + s.getPreDeliveryQueue().size());
+ _log.error("Direct delivery from PDQ with queued msgs:" + da.getPreDeliveryQueue().size());
}
}
else if (_messages.size() > 0)
@@ -899,7 +901,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//release lock now
_lock.unlock();
- synchronized (s.getSendLock())
+ Object sendLock = (da == null) ? new Object() : da.getSendLock();
+ synchronized (sendLock)
{
if (!s.isSuspended())
{
@@ -909,7 +912,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
System.identityHashCode(s) + ") :" + s);
}
- if (entry.taken(s))
+ if (entry.acquire(s))
{
//Message has been delivered so don't redeliver.
// This can currently occur because of the recursive call below
@@ -939,7 +942,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return;
}
//Deliver the message
- s.send(entry, _queue);
+ da.send(entry);
}
else
{
@@ -955,7 +958,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Why do we do this? What was the reasoning? We should have a better approach
// than recursion and rejecting if someone else sends it before we do.
//
- if (!entry.isTaken())
+ if (!entry.isAcquired())
{
if (debugEnabled)
{
@@ -996,6 +999,23 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
final Runner _asyncDelivery = new Runner();
+ public void debug()
+ {
+ System.err.println(_extraMessages);
+ System.err.println(_messages);
+
+ }
+
+ public void start(final Subscription subscription)
+ {
+ DeliveryAgent da = _subscriptions.getDeliveryAgent(subscription);
+ if(da != null)
+ {
+ da.start();
+ }
+ }
+
+
private class Runner implements Runnable
{
public void run()
@@ -1047,6 +1067,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ public void closeSubscription(final Subscription subscription)
+ {
+ _subscriptions.getDeliveryAgent(subscription).close();
+ subscription.close();
+ }
+
+
+ public void resend(final QueueEntry entry, final Subscription subscription)
+ {
+ final DeliveryAgent deliveryAgent = _subscriptions.getDeliveryAgent(subscription);
+ deliveryAgent.addToResendQueue(entry);
+
+ }
+
+
private String currentStatus()
{
return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") +
@@ -1058,4 +1093,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
" Processing:" + (_processing.get() ? " true : Processing Thread: " + _processingThreadName : " false");
}
+
+
}