summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java39
1 files changed, 9 insertions, 30 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index a0d86deb19..5cad28b80d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -26,10 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
/**
@@ -55,25 +52,19 @@ public class SubscriptionImpl implements Subscription
* True if messages need to be acknowledged
*/
private final boolean _acks;
- private FilterManager _filters;
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException
- {
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters);
- }
-
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag);
}
}
@@ -81,13 +72,6 @@ public class SubscriptionImpl implements Subscription
String consumerTag, boolean acks)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null);
- }
-
- public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks, FieldTable filters)
- throws AMQException
- {
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
@@ -99,9 +83,14 @@ public class SubscriptionImpl implements Subscription
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
- _filters = FilterManagerFactory.createManager(filters);
}
+ public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
+ String consumerTag)
+ throws AMQException
+ {
+ this(channel, protocolSession, consumerTag, false);
+ }
public boolean equals(Object o)
{
@@ -142,7 +131,7 @@ public class SubscriptionImpl implements Subscription
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
-
+
// By doing this _before_ the send we ensure that it
// doesn't get sent if it can't be dequeued, preventing
// duplicate delivery on recovery.
@@ -189,16 +178,6 @@ public class SubscriptionImpl implements Subscription
channel.queueDeleted(queue);
}
- public boolean hasFilters()
- {
- return _filters != null;
- }
-
- public boolean hasInterest(AMQMessage msg)
- {
- return _filters.allAllow(msg);
- }
-
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,