summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java95
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java6
6 files changed, 126 insertions, 40 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 561b719b2e..101a2833a0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -373,6 +373,15 @@ public class AMQQueue implements Managable, Comparable
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+
+ if(subscription.hasFilters())
+ {
+ if (_deliveryMgr.hasQueuedMessages())
+ {
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
+ }
+ }
+
_subscribers.addSubscriber(subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
index f9c8898182..022d3b9635 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
@@ -198,6 +198,11 @@ public class ConcurrentDeliveryManager implements DeliveryManager
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ //no-op . This DM has no PreDeliveryQueues
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -312,7 +317,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager
else
{
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
finally
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 8bdadcb493..f99f2d78b7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -148,6 +148,25 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
+ }
+
+ Iterator<AMQMessage> currentQueue = _messages.iterator();
+
+ while (currentQueue.hasNext())
+ {
+ AMQMessage message = currentQueue.next();
+ if (subscription.hasInterest(message))
+ {
+ subscription.enqueueForPreDelivery(message);
+ }
+ }
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -197,7 +216,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.info("Async Delivery Message:" + message + " to :" + sub);
sub.send(message, queue);
- message.setDeliveredToConsumer();
//remove sent message from our queue.
messageQueue.poll();
@@ -220,6 +238,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (hasSubscribers && hasQueuedMessages())
{
+ hasSubscribers = false;
+
for (Subscription sub : _subscriptions.getSubscriptions())
{
if (!sub.isSuspended())
@@ -232,13 +252,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
sendNextMessage(sub, _messages, _queue);
}
-
+
hasSubscribers = true;
}
- else
- {
- hasSubscribers = false;
- }
}
}
}
@@ -250,7 +266,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void deliver(String name, AMQMessage msg) throws FailedDequeueException
{
- _log.info("deliver :" + msg);
+ _log.info(id() + "deliver :" + System.identityHashCode(msg));
//Check if we have someone to deliver the message to.
_lock.lock();
@@ -260,7 +276,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (s == null) //no-one can take the message right now.
{
- _log.info("Testing Message(" + msg + ") for Queued Delivery");
+ _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
if (!msg.isImmediate())
{
addMessageToQueue(msg);
@@ -269,20 +285,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.unlock();
//Pre Deliver to all subscriptions
- _log.info("We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+ _log.info(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
for (Subscription sub : _subscriptions.getSubscriptions())
{
// stop if the message gets delivered whilst PreDelivering if we have a shared queue.
if (_queue.isShared() && msg.getDeliveredToConsumer())
{
- _log.info("Stopping PreDelivery as message(" + msg + ") is already delivered.");
+ _log.info(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered.");
continue;
}
// Only give the message to those that want them.
if (sub.hasInterest(msg))
{
+ _log.info(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
sub.enqueueForPreDelivery(msg);
}
}
@@ -293,10 +310,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//release lock now
_lock.unlock();
- _log.info("Delivering Message:" + msg + " to(" + System.identityHashCode(s) + ") :" + s);
+ _log.info(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s);
//Deliver the message
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
finally
@@ -309,6 +325,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ //fixme remove
+ private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")";
+
+ private String id()
+ {
+ return id;
+ }
+
Runner asyncDelivery = new Runner();
private class Runner implements Runnable
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index dadf86c1d8..cac499587f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -73,4 +73,6 @@ interface DeliveryManager
void clearAllMessages() throws AMQException;
List<AMQMessage> getMessages();
+
+ void populatePreDeliveryQueue(Subscription subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index fc00754cda..f4e7482396 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -160,32 +160,39 @@ public class SubscriptionImpl implements Subscription
{
if (msg != null)
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
-
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
- {
- queue.dequeue(msg);
- }
- synchronized(channel)
+ try
{
- long deliveryTag = channel.getNextDeliveryTag();
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- if (_acks)
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
{
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ queue.dequeue(msg);
}
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
- protocolSession.writeFrame(frame);
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+ protocolSession.writeFrame(frame);
+ }
+ }
+ finally
+ {
+ msg.setDeliveredToConsumer();
}
}
else
@@ -218,19 +225,55 @@ public class SubscriptionImpl implements Subscription
{
if (_noLocal)
{
- return !(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
- msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())));
+ // We don't want local messages so check to see if message is one we sent
+ if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+ msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
+ }
+ else // if not then filter the message.
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
+ ") but not ours so filtering");
+ }
+ return checkFilters(msg);
+ }
}
else
{
- if (_filters != null)
+ if (_logger.isTraceEnabled())
{
- return _filters.allAllow(msg);
+ _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
}
- else
+ return checkFilters(msg);
+ }
+ }
+
+ private boolean checkFilters(AMQMessage msg)
+ {
+ if (_filters != null)
+ {
+ if (_logger.isTraceEnabled())
{
- return true;
+ _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
}
+ return _filters.allAllow(msg);
+ }
+ else
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+ }
+
+ return true;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index 49b0111b67..c967ea2cde 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -122,6 +122,11 @@ class SynchronizedDeliveryManager implements DeliveryManager
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ //no-op . This DM has no PreDeliveryQueues
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -243,7 +248,6 @@ class SynchronizedDeliveryManager implements DeliveryManager
else
{
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
}