diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java | 39 |
1 files changed, 9 insertions, 30 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index a0d86deb19..5cad28b80d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -26,10 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; /** @@ -55,25 +52,19 @@ public class SubscriptionImpl implements Subscription * True if messages need to be acknowledged */ private final boolean _acks; - private FilterManager _filters; public static class Factory implements SubscriptionFactory { - public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException - { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters); - } - public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks); } public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null); + return new SubscriptionImpl(channel, protocolSession, consumerTag); } } @@ -81,13 +72,6 @@ public class SubscriptionImpl implements Subscription String consumerTag, boolean acks) throws AMQException { - this(channelId, protocolSession, consumerTag, acks, null); - } - - public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks, FieldTable filters) - throws AMQException - { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) { @@ -99,9 +83,14 @@ public class SubscriptionImpl implements Subscription this.consumerTag = consumerTag; sessionKey = protocolSession.getKey(); _acks = acks; - _filters = FilterManagerFactory.createManager(filters); } + public SubscriptionImpl(int channel, AMQProtocolSession protocolSession, + String consumerTag) + throws AMQException + { + this(channel, protocolSession, consumerTag, false); + } public boolean equals(Object o) { @@ -142,7 +131,7 @@ public class SubscriptionImpl implements Subscription { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. - + // By doing this _before_ the send we ensure that it // doesn't get sent if it can't be dequeued, preventing // duplicate delivery on recovery. @@ -189,16 +178,6 @@ public class SubscriptionImpl implements Subscription channel.queueDeleted(queue); } - public boolean hasFilters() - { - return _filters != null; - } - - public boolean hasInterest(AMQMessage msg) - { - return _filters.allAllow(msg); - } - private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, |