summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-20 12:46:20 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-20 12:46:20 +0000
commitbccee9d246c110e095951dc76b3ed09fecd61b63 (patch)
tree9a53599c53dad1fd9e1fb4db4dd1cf8b47d102b8 /java
parent818c4a16d4d301113336fb9e076a88c5ef4f3ebf (diff)
downloadqpid-python-bccee9d246c110e095951dc76b3ed09fecd61b63.tar.gz
QPID-21 outstanding issues:
Fixed an issue where a consumer with no_local set would not have its filters applied to messages. Fixed problem where new consumers would start with an empty PDQ rather than checking the existing queue of messages for messages of interest. AMQQueue.java - Added code check exisiting queue data for messages for the new subscriber with a filter. DeliveryManager.java - added populatePreDeliveryQueue SynchronizedDeliveryManager.java/ConcurrentDeliveryManager.java - implemented new DeliveryManager.java interface SubscriptionImpl.java - fixed issue with no_local subscribers had their filters ignored. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489070 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java95
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java6
6 files changed, 126 insertions, 40 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 561b719b2e..101a2833a0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -373,6 +373,15 @@ public class AMQQueue implements Managable, Comparable
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+
+ if(subscription.hasFilters())
+ {
+ if (_deliveryMgr.hasQueuedMessages())
+ {
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
+ }
+ }
+
_subscribers.addSubscriber(subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
index f9c8898182..022d3b9635 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
@@ -198,6 +198,11 @@ public class ConcurrentDeliveryManager implements DeliveryManager
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ //no-op . This DM has no PreDeliveryQueues
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -312,7 +317,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager
else
{
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
finally
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 8bdadcb493..f99f2d78b7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -148,6 +148,25 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
+ }
+
+ Iterator<AMQMessage> currentQueue = _messages.iterator();
+
+ while (currentQueue.hasNext())
+ {
+ AMQMessage message = currentQueue.next();
+ if (subscription.hasInterest(message))
+ {
+ subscription.enqueueForPreDelivery(message);
+ }
+ }
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -197,7 +216,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.info("Async Delivery Message:" + message + " to :" + sub);
sub.send(message, queue);
- message.setDeliveredToConsumer();
//remove sent message from our queue.
messageQueue.poll();
@@ -220,6 +238,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (hasSubscribers && hasQueuedMessages())
{
+ hasSubscribers = false;
+
for (Subscription sub : _subscriptions.getSubscriptions())
{
if (!sub.isSuspended())
@@ -232,13 +252,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
sendNextMessage(sub, _messages, _queue);
}
-
+
hasSubscribers = true;
}
- else
- {
- hasSubscribers = false;
- }
}
}
}
@@ -250,7 +266,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void deliver(String name, AMQMessage msg) throws FailedDequeueException
{
- _log.info("deliver :" + msg);
+ _log.info(id() + "deliver :" + System.identityHashCode(msg));
//Check if we have someone to deliver the message to.
_lock.lock();
@@ -260,7 +276,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (s == null) //no-one can take the message right now.
{
- _log.info("Testing Message(" + msg + ") for Queued Delivery");
+ _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
if (!msg.isImmediate())
{
addMessageToQueue(msg);
@@ -269,20 +285,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.unlock();
//Pre Deliver to all subscriptions
- _log.info("We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+ _log.info(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
for (Subscription sub : _subscriptions.getSubscriptions())
{
// stop if the message gets delivered whilst PreDelivering if we have a shared queue.
if (_queue.isShared() && msg.getDeliveredToConsumer())
{
- _log.info("Stopping PreDelivery as message(" + msg + ") is already delivered.");
+ _log.info(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered.");
continue;
}
// Only give the message to those that want them.
if (sub.hasInterest(msg))
{
+ _log.info(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
sub.enqueueForPreDelivery(msg);
}
}
@@ -293,10 +310,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//release lock now
_lock.unlock();
- _log.info("Delivering Message:" + msg + " to(" + System.identityHashCode(s) + ") :" + s);
+ _log.info(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s);
//Deliver the message
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
finally
@@ -309,6 +325,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ //fixme remove
+ private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")";
+
+ private String id()
+ {
+ return id;
+ }
+
Runner asyncDelivery = new Runner();
private class Runner implements Runnable
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index dadf86c1d8..cac499587f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -73,4 +73,6 @@ interface DeliveryManager
void clearAllMessages() throws AMQException;
List<AMQMessage> getMessages();
+
+ void populatePreDeliveryQueue(Subscription subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index fc00754cda..f4e7482396 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -160,32 +160,39 @@ public class SubscriptionImpl implements Subscription
{
if (msg != null)
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
-
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
- {
- queue.dequeue(msg);
- }
- synchronized(channel)
+ try
{
- long deliveryTag = channel.getNextDeliveryTag();
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- if (_acks)
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
{
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ queue.dequeue(msg);
}
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
- protocolSession.writeFrame(frame);
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+ protocolSession.writeFrame(frame);
+ }
+ }
+ finally
+ {
+ msg.setDeliveredToConsumer();
}
}
else
@@ -218,19 +225,55 @@ public class SubscriptionImpl implements Subscription
{
if (_noLocal)
{
- return !(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
- msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())));
+ // We don't want local messages so check to see if message is one we sent
+ if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+ msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
+ }
+ else // if not then filter the message.
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
+ ") but not ours so filtering");
+ }
+ return checkFilters(msg);
+ }
}
else
{
- if (_filters != null)
+ if (_logger.isTraceEnabled())
{
- return _filters.allAllow(msg);
+ _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
}
- else
+ return checkFilters(msg);
+ }
+ }
+
+ private boolean checkFilters(AMQMessage msg)
+ {
+ if (_filters != null)
+ {
+ if (_logger.isTraceEnabled())
{
- return true;
+ _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
}
+ return _filters.allAllow(msg);
+ }
+ else
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+ }
+
+ return true;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index 49b0111b67..c967ea2cde 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -122,6 +122,11 @@ class SynchronizedDeliveryManager implements DeliveryManager
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ //no-op . This DM has no PreDeliveryQueues
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -243,7 +248,6 @@ class SynchronizedDeliveryManager implements DeliveryManager
else
{
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
}