diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java | 22 |
1 files changed, 4 insertions, 18 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 23ae14eef1..1f25c215cc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; @@ -320,9 +319,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final Boolean _autoClose; - - private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); - private AMQQueue _queue; private final AtomicBoolean _deleted = new AtomicBoolean(false); @@ -479,10 +475,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean hasInterest(QueueEntry entry) { - - - - //check that the message hasn't been rejected if (entry.isRejectedBy(getSubscriptionID())) { @@ -490,27 +482,21 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { _logger.debug("Subscription:" + this + " rejected message:" + entry); } -// return false; } if (_noLocal) { - AMQMessage message = (AMQMessage) entry.getMessage(); - //todo - client id should be recorded so we don't have to handle - // the case where this is null. - final Object publisher = message.getPublisherIdentifier(); + final Object publisherReference = message.getConnectionIdentifier(); // We don't want local messages so check to see if message is one we sent - Object localInstance = getProtocolSession(); + Object localReference = getProtocolSession().getReference(); - if(publisher.equals(localInstance)) + if(publisherReference != null && publisherReference.equals(localReference)) { return false; } - - } @@ -585,7 +571,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean wouldSuspend(QueueEntry msg) { - return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage()); + return !_creditManager.useCreditForMessage(msg.getMessage().getSize()); } public boolean trySendLock() |