diff options
5 files changed, 23 insertions, 8 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index b50424868a..efefec58fd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -30,6 +30,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -118,11 +119,11 @@ public class FilterSupport } } - static final class NoLocalFilter implements MessageFilter + public static final class NoLocalFilter implements MessageFilter { - private final AMQQueue _queue; + private final MessageSource _queue; - public NoLocalFilter(AMQQueue queue) + public NoLocalFilter(MessageSource queue) { _queue = queue; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index aa465d373f..7e712c8e17 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -44,7 +44,10 @@ import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.filter.FilterSupport; +import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogActor; @@ -512,7 +515,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * @throws AMQException if something goes wrong */ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, - FieldTable filters, boolean exclusive) throws AMQException + FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException { if (tag == null) { @@ -549,6 +552,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F options.add(Consumer.Option.EXCLUSIVE); } + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. // We add before we register as the Async Delivery process may AutoClose the subscriber // so calling _cT2QM.remove before we have done put which was after the register succeeded. @@ -558,9 +562,18 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F try { + FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters)); + if(noLocal) + { + if(filterManager == null) + { + filterManager = new SimpleFilterManager(); + } + filterManager.add(new FilterSupport.NoLocalFilter(source)); + } Consumer sub = source.addConsumer(target, - FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), + filterManager, AMQMessage.class, AMQShortString.toString(tag), options); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index 526bc9b9fe..b28bb5a0ad 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -125,7 +125,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic queue, !body.getNoAck(), body.getArguments(), - body.getExclusive()); + body.getExclusive(), + body.getNoLocal()); if (!body.getNowait()) { MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index 281f7345ff..dded0c70fe 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -140,7 +140,7 @@ public class AcknowledgeTest extends QpidTestCase assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); //Subscribe to the queue - AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true); + AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false); getQueue().deliverAsync(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index e895f81c44..f6376e56c4 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -143,6 +143,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase FieldTable filters = new FieldTable(); filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - return channel.consumeFromSource(null, queue, true, filters, true); + return channel.consumeFromSource(null, queue, true, filters, true, false); } } |