diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-04-19 14:42:53 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-04-19 14:42:53 +0000 |
commit | e42ecdbd837f658b420437f11e8f9b639789fa73 (patch) | |
tree | 283cd26e5edac27135111e56e3a039fb4f0770f5 | |
parent | 4e57de5d68f750dad1604aa19ab7844629c60f7c (diff) | |
download | qpid-python-e42ecdbd837f658b420437f11e8f9b639789fa73.tar.gz |
QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test.
AMQChannel remove comment around setPublisher - this is used by noLocal implementation.
Subscription - rename of hasFilters to filtersMessages
AMQQueue/RemoteSubscriptionImpl/SubscriptionTestHelper/SubscriptionSet - rename of hasFilters to filtersMessages
SubscriptionImpl - rename of hasFilters to filtersMessages and changes to include noLocal in that check.
TopicSessionTest - Additional testing for NoLocal to ensure.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530432 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 28 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 2e1653e69d..97c95dae5e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -197,7 +197,6 @@ public class AMQChannel _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext); - // TODO: used in clustering only I think (RG) _currentMessage.setPublisher(publisher); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a418bb8f8a..65d5906d05 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.queue; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -237,8 +236,10 @@ public class AMQQueue implements Managable, Comparable /** * Returns messages within the given range of message Ids + * * @param fromMessageId * @param toMessageId + * * @return List of messages */ public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId) @@ -253,6 +254,7 @@ public class AMQQueue implements Managable, Comparable /** * @param messageId + * * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. */ public AMQMessage getMessageOnTheQueue(long messageId) @@ -267,10 +269,10 @@ public class AMQQueue implements Managable, Comparable /** * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for - * moving messages (stop the async delivery) - get all the messages available in the given message - * id range - setup the other queue for moving messages (stop the async delivery) - send these - * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful, - * remove messages from this queue - remove locks from both queues and start async delivery + * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup + * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue + * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove + * locks from both queues and start async delivery * * @param fromMessageId * @param toMessageId @@ -442,7 +444,7 @@ public class AMQQueue implements Managable, Comparable Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); - if (subscription.hasFilters()) + if (subscription.filtersMessages()) { if (_deliveryMgr.hasQueuedMessages()) { @@ -641,7 +643,7 @@ public class AMQQueue implements Managable, Comparable { _totalMessagesReceived.incrementAndGet(); } - + try { _managedObject.checkForNotification(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index e9f209839a..e6d5d0c88d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -32,7 +32,7 @@ public interface Subscription void queueDeleted(AMQQueue queue) throws AMQException; - boolean hasFilters(); + boolean filtersMessages(); boolean hasInterest(AMQMessage msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index e3944954f3..3bce950ba9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -158,7 +158,7 @@ public class SubscriptionImpl implements Subscription } - if (_filters != null) + if (filtersMessages()) { _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); } @@ -346,9 +346,9 @@ public class SubscriptionImpl implements Subscription channel.queueDeleted(queue); } - public boolean hasFilters() + public boolean filtersMessages() { - return _filters != null; + return _filters != null || _noLocal; } public boolean hasInterest(AMQMessage msg) @@ -363,7 +363,10 @@ public class SubscriptionImpl implements Subscription // return false; } - if (_noLocal) + final AMQProtocolSession publisher = msg.getPublisher(); + + //todo - client id should be recoreded and this test removed but handled below + if (_noLocal && publisher != null) { // We don't want local messages so check to see if message is one we sent Object localInstance; @@ -372,8 +375,9 @@ public class SubscriptionImpl implements Subscription if ((protocolSession.getClientProperties() != null) && (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if ((msg.getPublisher().getClientProperties() != null) && - (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + + if ((publisher.getClientProperties() != null) && + (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { if (localInstance == msgInstance || localInstance.equals(msgInstance)) { @@ -388,8 +392,11 @@ public class SubscriptionImpl implements Subscription } else { + localInstance = protocolSession.getClientIdentifier(); - msgInstance = msg.getPublisher().getClientIdentifier(); + //todo - client id should be recoreded and this test removed but handled here + + msgInstance = publisher.getClientIdentifier(); if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { if (_logger.isTraceEnabled()) @@ -399,7 +406,6 @@ public class SubscriptionImpl implements Subscription } return false; } - } @@ -623,7 +629,7 @@ public class SubscriptionImpl implements Subscription return _resendQueue; } - if (_filters != null) + if (filtersMessages()) { if (isAutoClose()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 26b040aae0..b500247fa4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -157,7 +157,7 @@ class SubscriptionSet implements WeightedSubscriptionManager //FIXME the queue could be full of sent messages. // Either need to clean all PDQs after sending a message // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty()) + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) { return subscription; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index a5ace41752..42412bebae 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -102,7 +102,7 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage } } - public boolean hasFilters() + public boolean filtersMessages() { return false; } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 01eb2ba6a2..1a0a341bbf 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -87,7 +87,7 @@ public class SubscriptionTestHelper implements Subscription { } - public boolean hasFilters() + public boolean filtersMessages() { return false; } |