diff options
Diffstat (limited to 'java')
6 files changed, 126 insertions, 40 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 561b719b2e..101a2833a0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -373,6 +373,15 @@ public class AMQQueue implements Managable, Comparable debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); + + if(subscription.hasFilters()) + { + if (_deliveryMgr.hasQueuedMessages()) + { + _deliveryMgr.populatePreDeliveryQueue(subscription); + } + } + _subscribers.addSubscriber(subscription); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java index f9c8898182..022d3b9635 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java @@ -198,6 +198,11 @@ public class ConcurrentDeliveryManager implements DeliveryManager return new ArrayList<AMQMessage>(_messages); } + public void populatePreDeliveryQueue(Subscription subscription) + { + //no-op . This DM has no PreDeliveryQueues + } + public synchronized void removeAMessageFromTop() throws AMQException { AMQMessage msg = poll(); @@ -312,7 +317,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager else { s.send(msg, _queue); - msg.setDeliveredToConsumer(); } } finally diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 8bdadcb493..f99f2d78b7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -148,6 +148,25 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return new ArrayList<AMQMessage>(_messages); } + public void populatePreDeliveryQueue(Subscription subscription) + { + if (_log.isTraceEnabled()) + { + _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")"); + } + + Iterator<AMQMessage> currentQueue = _messages.iterator(); + + while (currentQueue.hasNext()) + { + AMQMessage message = currentQueue.next(); + if (subscription.hasInterest(message)) + { + subscription.enqueueForPreDelivery(message); + } + } + } + public synchronized void removeAMessageFromTop() throws AMQException { AMQMessage msg = poll(); @@ -197,7 +216,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.info("Async Delivery Message:" + message + " to :" + sub); sub.send(message, queue); - message.setDeliveredToConsumer(); //remove sent message from our queue. messageQueue.poll(); @@ -220,6 +238,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (hasSubscribers && hasQueuedMessages()) { + hasSubscribers = false; + for (Subscription sub : _subscriptions.getSubscriptions()) { if (!sub.isSuspended()) @@ -232,13 +252,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { sendNextMessage(sub, _messages, _queue); } - + hasSubscribers = true; } - else - { - hasSubscribers = false; - } } } } @@ -250,7 +266,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void deliver(String name, AMQMessage msg) throws FailedDequeueException { - _log.info("deliver :" + msg); + _log.info(id() + "deliver :" + System.identityHashCode(msg)); //Check if we have someone to deliver the message to. _lock.lock(); @@ -260,7 +276,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (s == null) //no-one can take the message right now. { - _log.info("Testing Message(" + msg + ") for Queued Delivery"); + _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery"); if (!msg.isImmediate()) { addMessageToQueue(msg); @@ -269,20 +285,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.unlock(); //Pre Deliver to all subscriptions - _log.info("We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); + _log.info(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); for (Subscription sub : _subscriptions.getSubscriptions()) { // stop if the message gets delivered whilst PreDelivering if we have a shared queue. if (_queue.isShared() && msg.getDeliveredToConsumer()) { - _log.info("Stopping PreDelivery as message(" + msg + ") is already delivered."); + _log.info(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered."); continue; } // Only give the message to those that want them. if (sub.hasInterest(msg)) { + _log.info(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); sub.enqueueForPreDelivery(msg); } } @@ -293,10 +310,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //release lock now _lock.unlock(); - _log.info("Delivering Message:" + msg + " to(" + System.identityHashCode(s) + ") :" + s); + _log.info(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); //Deliver the message s.send(msg, _queue); - msg.setDeliveredToConsumer(); } } finally @@ -309,6 +325,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + //fixme remove + private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")"; + + private String id() + { + return id; + } + Runner asyncDelivery = new Runner(); private class Runner implements Runnable diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index dadf86c1d8..cac499587f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -73,4 +73,6 @@ interface DeliveryManager void clearAllMessages() throws AMQException; List<AMQMessage> getMessages(); + + void populatePreDeliveryQueue(Subscription subscription); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index fc00754cda..f4e7482396 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -160,32 +160,39 @@ public class SubscriptionImpl implements Subscription { if (msg != null) { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. - - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. - - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - if (!_acks) - { - queue.dequeue(msg); - } - synchronized(channel) + try { - long deliveryTag = channel.getNextDeliveryTag(); + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - if (_acks) + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. + + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!_acks) { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + queue.dequeue(msg); } + synchronized(channel) + { + long deliveryTag = channel.getNextDeliveryTag(); - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); - AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + if (_acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } - protocolSession.writeFrame(frame); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + + protocolSession.writeFrame(frame); + } + } + finally + { + msg.setDeliveredToConsumer(); } } else @@ -218,19 +225,55 @@ public class SubscriptionImpl implements Subscription { if (_noLocal) { - return !(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals( - msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString()))); + // We don't want local messages so check to see if message is one we sent + if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals( + msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString()))) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + + System.identityHashCode(msg) + ")"); + } + return false; + } + else // if not then filter the message. + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) + + ") but not ours so filtering"); + } + return checkFilters(msg); + } } else { - if (_filters != null) + if (_logger.isTraceEnabled()) { - return _filters.allAllow(msg); + _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); } - else + return checkFilters(msg); + } + } + + private boolean checkFilters(AMQMessage msg) + { + if (_filters != null) + { + if (_logger.isTraceEnabled()) { - return true; + _logger.trace("(" + System.identityHashCode(this) + ") has filters."); } + return _filters.allAllow(msg); + } + else + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no filters"); + } + + return true; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index 49b0111b67..c967ea2cde 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -122,6 +122,11 @@ class SynchronizedDeliveryManager implements DeliveryManager return new ArrayList<AMQMessage>(_messages); } + public void populatePreDeliveryQueue(Subscription subscription) + { + //no-op . This DM has no PreDeliveryQueues + } + public synchronized void removeAMessageFromTop() throws AMQException { AMQMessage msg = poll(); @@ -243,7 +248,6 @@ class SynchronizedDeliveryManager implements DeliveryManager else { s.send(msg, _queue); - msg.setDeliveredToConsumer(); } } } |