summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java17
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java2
5 files changed, 23 insertions, 8 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index b50424868a..efefec58fd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -30,6 +30,7 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -118,11 +119,11 @@ public class FilterSupport
}
}
- static final class NoLocalFilter implements MessageFilter
+ public static final class NoLocalFilter implements MessageFilter
{
- private final AMQQueue _queue;
+ private final MessageSource _queue;
- public NoLocalFilter(AMQQueue queue)
+ public NoLocalFilter(MessageSource queue)
{
_queue = queue;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index aa465d373f..7e712c8e17 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -44,7 +44,10 @@ import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
@@ -512,7 +515,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
* @throws AMQException if something goes wrong
*/
public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
- FieldTable filters, boolean exclusive) throws AMQException
+ FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException
{
if (tag == null)
{
@@ -549,6 +552,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
options.add(Consumer.Option.EXCLUSIVE);
}
+
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
@@ -558,9 +562,18 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
try
{
+ FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+ if(noLocal)
+ {
+ if(filterManager == null)
+ {
+ filterManager = new SimpleFilterManager();
+ }
+ filterManager.add(new FilterSupport.NoLocalFilter(source));
+ }
Consumer sub =
source.addConsumer(target,
- FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+ filterManager,
AMQMessage.class,
AMQShortString.toString(tag),
options);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index 526bc9b9fe..b28bb5a0ad 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -125,7 +125,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
queue,
!body.getNoAck(),
body.getArguments(),
- body.getExclusive());
+ body.getExclusive(),
+ body.getNoLocal());
if (!body.getNowait())
{
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index 281f7345ff..dded0c70fe 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends QpidTestCase
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
+ AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
getQueue().deliverAsync();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index e895f81c44..f6376e56c4 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -143,6 +143,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.consumeFromSource(null, queue, true, filters, true);
+ return channel.consumeFromSource(null, queue, true, filters, true, false);
}
}