summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java232
1 files changed, 203 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 3a8a86e654..27166e4384 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -20,29 +20,39 @@
*/
package org.apache.qpid.server.exchange;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.management.JMException;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.topic.*;
+import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
+import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
+import org.apache.qpid.server.exchange.topic.TopicNormalizer;
+import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
-
-import javax.management.JMException;
-import java.sql.Array;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import java.lang.ref.WeakReference;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class TopicExchange extends AbstractExchange
{
@@ -113,24 +123,26 @@ public class TopicExchange extends AbstractExchange
FieldTable oldArgs = _bindings.get(binding);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
- if(argumentsContainSelector(args))
+ if(argumentsContainFilter(args))
{
- if(argumentsContainSelector(oldArgs))
+ if(argumentsContainFilter(oldArgs))
{
- result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args));
+ result.replaceQueueFilter(queue,
+ createMessageFilter(oldArgs, queue),
+ createMessageFilter(args, queue));
}
else
{
- result.addFilteredQueue(queue,createSelectorFilter(args));
+ result.addFilteredQueue(queue, createMessageFilter(args, queue));
result.removeUnfilteredQueue(queue);
}
}
else
{
- if(argumentsContainSelector(oldArgs))
+ if(argumentsContainFilter(oldArgs))
{
result.addUnfilteredQueue(queue);
- result.removeFilteredQueue(queue, createSelectorFilter(oldArgs));
+ result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue));
}
else
{
@@ -149,9 +161,9 @@ public class TopicExchange extends AbstractExchange
if(result == null)
{
result = new TopicExchangeResult();
- if(argumentsContainSelector(args))
+ if(argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createSelectorFilter(args));
+ result.addFilteredQueue(queue, createMessageFilter(args, queue));
}
else
{
@@ -162,9 +174,9 @@ public class TopicExchange extends AbstractExchange
}
else
{
- if(argumentsContainSelector(args))
+ if(argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createSelectorFilter(args));
+ result.addFilteredQueue(queue, createMessageFilter(args, queue));
}
else
{
@@ -178,26 +190,74 @@ public class TopicExchange extends AbstractExchange
}
- private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQInvalidArgumentException
+ private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
{
+ if(argumentsContainNoLocal(args))
+ {
+ MessageFilter filter = new NoLocalFilter(queue);
+
+ if(argumentsContainJMSSelector(args))
+ {
+ filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ }
+ return filter;
+ }
+ else
+ {
+ return createJMSSelectorFilter(args);
+ }
+
+ }
+
+ private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
+ {
final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
JMSSelectorFilter selector = null;
if(selectorRef == null || (selector = selectorRef.get())==null)
{
- selector = new JMSSelectorFilter(selectorString);
+ try
+ {
+ selector = new JMSSelectorFilter(selectorString);
+ }
+ catch (ParseException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ catch (SelectorParsingException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ catch (TokenMgrError e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
_selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
}
return selector;
}
- private static boolean argumentsContainSelector(final FieldTable args)
+ private static boolean argumentsContainFilter(final FieldTable args)
{
- return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0;
+ return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
}
+ private static boolean argumentsContainNoLocal(final FieldTable args)
+ {
+ return args != null
+ && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
+ && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
+ }
+
+ private static boolean argumentsContainJMSSelector(final FieldTable args)
+ {
+ return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
+ && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
+ }
+
+
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
@@ -251,6 +311,28 @@ public class TopicExchange extends AbstractExchange
}
}
+ public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
+ {
+ Binding binding = new Binding(null, bindingKey, queue, this, arguments);
+ if (arguments == null)
+ {
+ return _bindings.containsKey(binding);
+ }
+ else
+ {
+ FieldTable o = _bindings.get(binding);
+ if (o != null)
+ {
+ return arguments.equals(FieldTable.convertToMap(o));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ }
+
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return isBound(routingKey, null, queue);
@@ -297,11 +379,11 @@ public class TopicExchange extends AbstractExchange
result.removeBinding(binding);
- if(argumentsContainSelector(bindingArgs))
+ if(argumentsContainFilter(bindingArgs))
{
try
{
- result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs));
+ result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue()));
}
catch (AMQInvalidArgumentException e)
{
@@ -378,4 +460,96 @@ public class TopicExchange extends AbstractExchange
deregisterQueue(binding);
}
+ private static final class NoLocalFilter implements MessageFilter
+ {
+ private final AMQQueue _queue;
+
+ public NoLocalFilter(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ InboundMessage inbound = (InboundMessage) message;
+ final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
+ return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
+
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ NoLocalFilter that = (NoLocalFilter) o;
+
+ return _queue == null ? that._queue == null : _queue.equals(that._queue);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _queue != null ? _queue.hashCode() : 0;
+ }
+ }
+
+ private static final class CompoundFilter implements MessageFilter
+ {
+ private MessageFilter _noLocalFilter;
+ private MessageFilter _jmsSelectorFilter;
+
+ public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
+ {
+ _noLocalFilter = filter;
+ _jmsSelectorFilter = jmsSelectorFilter;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ CompoundFilter that = (CompoundFilter) o;
+
+ if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
+ {
+ return false;
+ }
+ if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
+ result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
+ return result;
+ }
+ }
}