diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-17 12:35:17 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-17 12:35:17 +0000 |
commit | 3ec5ee10947c7eebd7ac360a287cbb362e781dc9 (patch) | |
tree | a30c4cfe95aa2e02ecf2776a1f35a0a472542a0a | |
parent | 70005cf99a32a343212a21340cdcd6080d438ab0 (diff) | |
download | qpid-python-3ec5ee10947c7eebd7ac360a287cbb362e781dc9.tar.gz |
Updated message selector
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585457 13f79535-47bb-0310-9956-ffa450edef68
13 files changed, 293 insertions, 69 deletions
diff --git a/java/client/pom.xml b/java/client/pom.xml index bd869d96ca..b58ab9fe3a 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -123,6 +123,25 @@ <build> <plugins> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>javacc-maven-plugin</artifactId> + <version>2.0</version> + <executions> + <execution> + <phase>generate-sources</phase> + <configuration> + <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory> + <outputDirectory>${basedir}/target/generated-sources</outputDirectory> + <packageName>org.apache.qpidity.filter.selector</packageName> + </configuration> + <goals> + <goal>javacc</goal> + </goals> + </execution> + </executions> + </plugin> <plugin> <artifactId>minijar-maven-plugin</artifactId> diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index af259080a3..58b84910d4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -98,7 +98,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { _preAcquire = false; } - } + } } // ----- Interface org.apache.qpidity.client.util.MessageListener @@ -245,7 +245,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { if (getMessageSelector() != null) { - messageOk = _filter.matches((javax.jms.Message) message); + messageOk = _filter.matches(message); } } catch (Exception e) @@ -253,11 +253,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e); } - System.out.println("---------------------------------------------------------"); - System.out.println("messageOk : " + messageOk + " pre-acquire mode : " + _preAcquire); - System.out.println("---------------------------------------------------------"); - - if (_logger.isDebugEnabled()) + if (_logger.isDebugEnabled()) { _logger.debug("messageOk " + messageOk); _logger.debug("_preAcquire " + _preAcquire); diff --git a/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java index 9baade610b..0b0b794135 100644 --- a/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java @@ -570,7 +570,7 @@ public class MessageConsumerImpl extends MessageActor boolean messageOk = true; if (_messageSelector != null) { - messageOk = _filter.matches((Message) message); + messageOk = false; //_filter.matches(message); } if (_logger.isDebugEnabled()) { diff --git a/java/common/src/main/java/org/apache/qpidity/filter/ArithmeticExpression.java b/java/common/src/main/java/org/apache/qpidity/filter/ArithmeticExpression.java index d4b49bc63e..279716598d 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/ArithmeticExpression.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/ArithmeticExpression.java @@ -18,8 +18,8 @@ package org.apache.qpidity.filter; import org.apache.qpidity.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; -import javax.jms.Message; /** * An expression which performs an operation on two expression values @@ -241,7 +241,7 @@ public abstract class ArithmeticExpression extends BinaryExpression } } - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Object lvalue = left.evaluate(message); if (lvalue == null) diff --git a/java/common/src/main/java/org/apache/qpidity/filter/BooleanExpression.java b/java/common/src/main/java/org/apache/qpidity/filter/BooleanExpression.java index 1314b97a38..9a3b1c3106 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/BooleanExpression.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/BooleanExpression.java @@ -18,8 +18,8 @@ package org.apache.qpidity.filter; import org.apache.qpidity.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; -import javax.jms.Message; /** * A BooleanExpression is an expression that always @@ -28,6 +28,6 @@ import javax.jms.Message; public interface BooleanExpression extends Expression { - public boolean matches(Message message) throws QpidException; + public boolean matches(AbstractJMSMessage message) throws QpidException; } diff --git a/java/common/src/main/java/org/apache/qpidity/filter/ComparisonExpression.java b/java/common/src/main/java/org/apache/qpidity/filter/ComparisonExpression.java index 82fee7ae14..46f387b293 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/ComparisonExpression.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/ComparisonExpression.java @@ -18,8 +18,8 @@ package org.apache.qpidity.filter; import org.apache.qpidity.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; -import javax.jms.Message; import java.util.HashSet; import java.util.List; import java.util.regex.Pattern; @@ -131,7 +131,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B /** * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) */ - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Object rv = this.getRight().evaluate(message); @@ -151,7 +151,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; } - public boolean matches(Message message) throws QpidException + public boolean matches(AbstractJMSMessage message) throws QpidException { Object object = evaluate(message); @@ -234,7 +234,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B return new ComparisonExpression(left, right) { - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Object lv = left.evaluate(message); Object rv = right.evaluate(message); @@ -417,7 +417,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B super(left, right); } - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Comparable lv = (Comparable) left.evaluate(message); if (lv == null) @@ -579,7 +579,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B protected abstract boolean asBoolean(int answer); - public boolean matches(Message message) throws QpidException + public boolean matches(AbstractJMSMessage message) throws QpidException { Object object = evaluate(message); diff --git a/java/common/src/main/java/org/apache/qpidity/filter/ConstantExpression.java b/java/common/src/main/java/org/apache/qpidity/filter/ConstantExpression.java index 49e4bdabea..26aeec2de8 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/ConstantExpression.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/ConstantExpression.java @@ -18,8 +18,8 @@ package org.apache.qpidity.filter; import org.apache.qpidity.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; -import javax.jms.Message; import java.math.BigDecimal; /** @@ -35,7 +35,7 @@ public class ConstantExpression implements Expression super(value); } - public boolean matches(Message message) throws QpidException + public boolean matches(AbstractJMSMessage message) throws QpidException { Object object = evaluate(message); @@ -114,7 +114,7 @@ public class ConstantExpression implements Expression this.value = value; } - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { return value; } diff --git a/java/common/src/main/java/org/apache/qpidity/filter/Expression.java b/java/common/src/main/java/org/apache/qpidity/filter/Expression.java index d50338514b..bdc3c9cccc 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/Expression.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/Expression.java @@ -18,8 +18,8 @@ package org.apache.qpidity.filter; import org.apache.qpidity.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; -import javax.jms.Message; /** * Represents an expression @@ -30,5 +30,5 @@ public interface Expression * @param message The message to evaluate * @return the value of this expression */ - public Object evaluate(Message message) throws QpidException; + public Object evaluate(AbstractJMSMessage message) throws QpidException; } diff --git a/java/common/src/main/java/org/apache/qpidity/filter/JMSSelectorFilter.java b/java/common/src/main/java/org/apache/qpidity/filter/JMSSelectorFilter.java index 3e6b9f72a0..c73da1682a 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/JMSSelectorFilter.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/JMSSelectorFilter.java @@ -21,8 +21,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpidity.QpidException; import org.apache.qpidity.filter.selector.SelectorParser; +import org.apache.qpid.client.message.AbstractJMSMessage; -import javax.jms.Message; public class JMSSelectorFilter implements MessageFilter { @@ -44,7 +44,7 @@ public class JMSSelectorFilter implements MessageFilter _matcher = new SelectorParser().parse(selector); } - public boolean matches(Message message) + public boolean matches(AbstractJMSMessage message) { try { diff --git a/java/common/src/main/java/org/apache/qpidity/filter/LogicExpression.java b/java/common/src/main/java/org/apache/qpidity/filter/LogicExpression.java index 0fc3f42530..7f5909df43 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/LogicExpression.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/LogicExpression.java @@ -18,8 +18,8 @@ package org.apache.qpidity.filter; import org.apache.qpidity.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; -import javax.jms.Message; /** * A filter performing a comparison of two objects @@ -32,7 +32,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea return new LogicExpression(lvalue, rvalue) { - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Boolean lv = (Boolean) left.evaluate(message); @@ -59,7 +59,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea return new LogicExpression(lvalue, rvalue) { - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Boolean lv = (Boolean) left.evaluate(message); @@ -96,9 +96,9 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea super(left, right); } - public abstract Object evaluate(Message message) throws QpidException; + public abstract Object evaluate(AbstractJMSMessage message) throws QpidException; - public boolean matches(Message message) throws QpidException + public boolean matches(AbstractJMSMessage message) throws QpidException { Object object = evaluate(message); diff --git a/java/common/src/main/java/org/apache/qpidity/filter/MessageFilter.java b/java/common/src/main/java/org/apache/qpidity/filter/MessageFilter.java index a07a9c6080..aa1303a373 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/MessageFilter.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/MessageFilter.java @@ -18,10 +18,10 @@ package org.apache.qpidity.filter; import org.apache.qpidity.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; -import javax.jms.Message; public interface MessageFilter { - boolean matches(Message message) throws QpidException; + boolean matches(AbstractJMSMessage message) throws QpidException; } diff --git a/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java b/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java index d52dfacfa3..5ea2004d75 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java @@ -17,70 +17,279 @@ */ package org.apache.qpidity.filter; -import org.slf4j.LoggerFactory; -import org.apache.qpidity.ErrorCode; +import org.apache.qpid.framing.CommonContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpidity.QpidException; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; -import javax.jms.Message; -import java.lang.reflect.Method; +import javax.jms.JMSException; +import java.util.HashMap; /** * Represents a property expression */ public class PropertyExpression implements Expression { - private static final org.slf4j.Logger _logger = LoggerFactory.getLogger(PropertyExpression.class); + // Constants - defined the same as JMS + private static final int NON_PERSISTENT = 1; + private static final int DEFAULT_PRIORITY = 4; + + private static final Logger _logger = LoggerFactory.getLogger(PropertyExpression.class); + + private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>(); + + static + { + JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + //TODO + return null; + } + }); + JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + AMQShortString replyTo = _properties.getReplyTo(); + + return (replyTo == null) ? null : replyTo.toString(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property", e); + + return null; + } + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSType", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + AMQShortString type = _properties.getType(); + + return (type == null) ? null : type.toString(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property", e); + + return null; + } + + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + int mode = message.getJMSDeliveryMode(); + if (_logger.isDebugEnabled()) + { + _logger.debug("JMSDeliveryMode is :" + mode); + } + + return mode; + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + } + + return NON_PERSISTENT; + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + return (int) _properties.getPriority(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + } + + return DEFAULT_PRIORITY; + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + AMQShortString messageId = _properties.getMessageId(); + + return (messageId == null) ? null : messageId; + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + + return null; + } + + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + return _properties.getTimestamp(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + + return null; + } - private Method _getter; + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + AMQShortString correlationId = _properties.getCorrelationId(); + return (correlationId == null) ? null : correlationId.toString(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + + return null; + } + + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + + try + { + CommonContentHeaderProperties _properties = + message.getContentHeaderProperties(); + return _properties.getExpiration(); + } + catch (Exception e) + { + _logger.warn("Error evaluating property",e); + return null; + } + + } + }); + + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() + { + public Object evaluate(AbstractJMSMessage message) + { + try + { + return message.getJMSRedelivered(); + } + catch (JMSException e) + { + _logger.warn("Error evaluating property",e); + return null; + } + } + }); + + } + + private final String name; + private final Expression jmsPropertyExpression; public PropertyExpression(String name) { - Class clazz = Message.class; - try - { - _getter = clazz.getMethod("get" + name, null); - } - catch (NoSuchMethodException e) - { - PropertyExpression._logger.warn("Cannot compare property: " + name, e); - } + this.name = name; + jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name); } - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { - Object result = null; - if( _getter != null ) + + if (jmsPropertyExpression != null) { - try - { - result = _getter.invoke(message, null); - } - catch (Exception e) + return jmsPropertyExpression.evaluate(message); + } + else + { + + CommonContentHeaderProperties _properties = message.getContentHeaderProperties(); + if (_logger.isDebugEnabled()) { - throw new QpidException("cannot evaluate property ", ErrorCode.UNDEFINED, e); + _logger.debug("Looking up property:" + name); + _logger.debug("Properties are:" + _properties.getHeaders().keySet()); } + return _properties.getHeaders().getObject(name); } - return result; + } + + public String getName() + { + return name; } /** - * @see Object#toString() + * @see java.lang.Object#toString() */ public String toString() { - return _getter.toString(); + return name; } /** - * @see Object#hashCode() + * @see java.lang.Object#hashCode() */ public int hashCode() { - return _getter.hashCode(); + return name.hashCode(); } /** - * @see Object#equals(Object) + * @see java.lang.Object#equals(java.lang.Object) */ public boolean equals(Object o) { @@ -88,7 +297,7 @@ public class PropertyExpression implements Expression { return false; } - return _getter.equals(((PropertyExpression) o)._getter); + return name.equals(((PropertyExpression) o).name); } } diff --git a/java/common/src/main/java/org/apache/qpidity/filter/UnaryExpression.java b/java/common/src/main/java/org/apache/qpidity/filter/UnaryExpression.java index 01bd2f8d6c..f73449679c 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/UnaryExpression.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/UnaryExpression.java @@ -18,8 +18,8 @@ package org.apache.qpidity.filter; import org.apache.qpidity.QpidException; +import org.apache.qpid.client.message.AbstractJMSMessage; -import javax.jms.Message; import java.math.BigDecimal; import java.util.List; import java.util.Collection; @@ -39,7 +39,7 @@ public abstract class UnaryExpression implements Expression { return new UnaryExpression(left) { - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Object rvalue = right.evaluate(message); if (rvalue == null) @@ -84,7 +84,7 @@ public abstract class UnaryExpression implements Expression return new BooleanUnaryExpression(right) { - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Object rvalue = right.evaluate(message); @@ -156,7 +156,7 @@ public abstract class UnaryExpression implements Expression super(left); } - public boolean matches(Message message) throws QpidException + public boolean matches(AbstractJMSMessage message) throws QpidException { Object object = evaluate(message); @@ -170,7 +170,7 @@ public abstract class UnaryExpression implements Expression { return new BooleanUnaryExpression(left) { - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Boolean lvalue = (Boolean) right.evaluate(message); if (lvalue == null) @@ -191,7 +191,7 @@ public abstract class UnaryExpression implements Expression { return new BooleanUnaryExpression(left) { - public Object evaluate(Message message) throws QpidException + public Object evaluate(AbstractJMSMessage message) throws QpidException { Object rvalue = right.evaluate(message); if (rvalue == null) |