diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-26 21:04:28 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-26 21:04:28 +0000 |
commit | 8727d212214ddcfe4a1f945ab14a0cd8d59b9837 (patch) | |
tree | 9ad3f470957eb36921ab4a8c354343b160b22f8e | |
parent | 0d5658dfb5a210a6987ef75e2d1813944ab79c4c (diff) | |
download | qpid-python-8727d212214ddcfe4a1f945ab14a0cd8d59b9837.tar.gz |
Merge of trunk up to rev 489403
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@490372 13f79535-47bb-0310-9956-ffa450edef68
99 files changed, 6486 insertions, 272 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 0862588a0d..40e2f468e0 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -82,8 +82,8 @@ <auto_register>true</auto_register> </queue> <store> - <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>--> - <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + <!--<class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>--> </store> <virtualhosts>${conf}/virtualhosts.xml</virtualhosts> </broker> diff --git a/java/broker/etc/qpid-server.conf.jpp b/java/broker/etc/qpid-server.conf.jpp new file mode 100644 index 0000000000..3ed2431ef3 --- /dev/null +++ b/java/broker/etc/qpid-server.conf.jpp @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +QPID_LIBS=$(build-classpath backport-util-concurrent \ + commons-beanutils \ + commons-beanutils-core \ + commons-cli \ + commons-codec \ + commons-collections \ + commons-configuration \ + commons-digester \ + commons-lang \ + commons-logging \ + commons-logging-api \ + dom4j \ + geronimo-jms-1.1-api \ + isorelax \ + jaxen \ + log4j \ + mina/core \ + mina/filter-ssl \ + mina/java5 \ + msv-msv \ + qpid-broker \ + qpid-client \ + qpid-common \ + relaxngDatatype \ + slf4j) + +export JAVA=java \ + JAVA_VM=-server \ + JAVA_MEM=-Xmx1024m \ + CLASSPATH=$QPID_LIBS diff --git a/java/broker/pom.xml b/java/broker/pom.xml index aea2d5878a..92a3d69060 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -34,7 +34,6 @@ <properties> <topDirectoryLocation>..</topDirectoryLocation> - <amqj.logging.level>warn</amqj.logging.level> </properties> <dependencies> @@ -55,6 +54,10 @@ <artifactId>commons-lang</artifactId> </dependency> <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + </dependency> + <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-filter-ssl</artifactId> </dependency> @@ -84,6 +87,29 @@ <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>javacc-maven-plugin</artifactId> + <version>2.0</version> + <executions> + <execution> + <phase>generate-sources</phase> + <configuration> + <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory> + <outputDirectory>${basedir}/target/generated-sources</outputDirectory> + <packageName>org.apache.qpid.server.filter.jms.selector</packageName> + </configuration> + <goals> + <goal>javacc</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <systemProperties> diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj new file mode 100644 index 0000000000..5553a46e47 --- /dev/null +++ b/java/broker/src/main/grammar/SelectorParser.jj @@ -0,0 +1,598 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+ //
+ // Original File from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+ //
+
+// ----------------------------------------------------------------------------
+// OPTIONS
+// ----------------------------------------------------------------------------
+options {
+ STATIC = false;
+ UNICODE_INPUT = true;
+
+ // some performance optimizations
+ OPTIMIZE_TOKEN_MANAGER = true;
+ ERROR_REPORTING = false;
+}
+
+// ----------------------------------------------------------------------------
+// PARSER
+// ----------------------------------------------------------------------------
+
+PARSER_BEGIN(SelectorParser)
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.filter.jms.selector;
+
+import java.io.*;
+import java.util.*;
+
+import javax.jms.InvalidSelectorException;
+
+import org.apache.qpid.server.filter.*;
+
+/**
+ * JMS Selector Parser generated by JavaCC
+ *
+ * Do not edit this .java file directly - it is autogenerated from SelectorParser.jj
+ */
+public class SelectorParser {
+
+ public SelectorParser() {
+ this(new StringReader(""));
+ }
+
+ public BooleanExpression parse(String sql) throws InvalidSelectorException {
+ this.ReInit(new StringReader(sql));
+
+ try {
+ return this.JmsSelector();
+ }
+ catch (Throwable e) {
+ throw (InvalidSelectorException)new InvalidSelectorException(sql).initCause(e);
+ }
+
+ }
+
+ private BooleanExpression asBooleanExpression(Expression value) throws ParseException {
+ if (value instanceof BooleanExpression) {
+ return (BooleanExpression) value;
+ }
+ if (value instanceof PropertyExpression) {
+ return UnaryExpression.createBooleanCast( value );
+ }
+ throw new ParseException("Expression will not result in a boolean value: " + value);
+ }
+
+
+}
+
+PARSER_END(SelectorParser)
+
+// ----------------------------------------------------------------------------
+// Tokens
+// ----------------------------------------------------------------------------
+
+/* White Space */
+SPECIAL_TOKEN :
+{
+ " " | "\t" | "\n" | "\r" | "\f"
+}
+
+/* Comments */
+SKIP:
+{
+ <LINE_COMMENT: "--" (~["\n","\r"])* ("\n"|"\r"|"\r\n") >
+}
+
+SKIP:
+{
+ <BLOCK_COMMENT: "/*" (~["*"])* "*" ("*" | (~["*","/"] (~["*"])* "*"))* "/">
+}
+
+/* Reserved Words */
+TOKEN [IGNORE_CASE] :
+{
+ < NOT : "NOT">
+ | < AND : "AND">
+ | < OR : "OR">
+ | < BETWEEN : "BETWEEN">
+ | < LIKE : "LIKE">
+ | < ESCAPE : "ESCAPE">
+ | < IN : "IN">
+ | < IS : "IS">
+ | < TRUE : "TRUE" >
+ | < FALSE : "FALSE" >
+ | < NULL : "NULL" >
+ | < XPATH : "XPATH" >
+ | < XQUERY : "XQUERY" >
+}
+
+/* Literals */
+TOKEN [IGNORE_CASE] :
+{
+
+ < DECIMAL_LITERAL: ["1"-"9"] (["0"-"9"])* (["l","L"])? >
+ | < HEX_LITERAL: "0" ["x","X"] (["0"-"9","a"-"f","A"-"F"])+ >
+ | < OCTAL_LITERAL: "0" (["0"-"7"])* >
+ | < FLOATING_POINT_LITERAL:
+ (["0"-"9"])+ "." (["0"-"9"])* (<EXPONENT>)? // matches: 5.5 or 5. or 5.5E10 or 5.E10
+ | "." (["0"-"9"])+ (<EXPONENT>)? // matches: .5 or .5E10
+ | (["0"-"9"])+ <EXPONENT> // matches: 5E10
+ >
+ | < #EXPONENT: "E" (["+","-"])? (["0"-"9"])+ >
+ | < STRING_LITERAL: "'" ( ("''") | ~["'"] )* "'" >
+}
+
+TOKEN [IGNORE_CASE] :
+{
+ < ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* >
+}
+
+// ----------------------------------------------------------------------------
+// Grammer
+// ----------------------------------------------------------------------------
+BooleanExpression JmsSelector() :
+{
+ Expression left=null;
+}
+{
+ (
+ left = orExpression()
+ )
+ {
+ return asBooleanExpression(left);
+ }
+
+}
+
+Expression orExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = andExpression()
+ (
+ <OR> right = andExpression()
+ {
+ left = LogicExpression.createOR(asBooleanExpression(left), asBooleanExpression(right));
+ }
+ )*
+ )
+ {
+ return left;
+ }
+
+}
+
+
+Expression andExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = equalityExpression()
+ (
+ <AND> right = equalityExpression()
+ {
+ left = LogicExpression.createAND(asBooleanExpression(left), asBooleanExpression(right));
+ }
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression equalityExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = comparisonExpression()
+ (
+
+ "=" right = comparisonExpression()
+ {
+ left = ComparisonExpression.createEqual(left, right);
+ }
+ |
+ "<>" right = comparisonExpression()
+ {
+ left = ComparisonExpression.createNotEqual(left, right);
+ }
+ |
+ LOOKAHEAD(2)
+ <IS> <NULL>
+ {
+ left = ComparisonExpression.createIsNull(left);
+ }
+ |
+ <IS> <NOT> <NULL>
+ {
+ left = ComparisonExpression.createIsNotNull(left);
+ }
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression comparisonExpression() :
+{
+ Expression left;
+ Expression right;
+ Expression low;
+ Expression high;
+ String t, u;
+ boolean not;
+ ArrayList list;
+}
+{
+ (
+ left = addExpression()
+ (
+
+ ">" right = addExpression()
+ {
+ left = ComparisonExpression.createGreaterThan(left, right);
+ }
+ |
+ ">=" right = addExpression()
+ {
+ left = ComparisonExpression.createGreaterThanEqual(left, right);
+ }
+ |
+ "<" right = addExpression()
+ {
+ left = ComparisonExpression.createLessThan(left, right);
+ }
+ |
+ "<=" right = addExpression()
+ {
+ left = ComparisonExpression.createLessThanEqual(left, right);
+ }
+ |
+ {
+ u=null;
+ }
+ <LIKE> t = stringLitteral()
+ [ <ESCAPE> u = stringLitteral() ]
+ {
+ left = ComparisonExpression.createLike(left, t, u);
+ }
+ |
+ LOOKAHEAD(2)
+ {
+ u=null;
+ }
+ <NOT> <LIKE> t = stringLitteral() [ <ESCAPE> u = stringLitteral() ]
+ {
+ left = ComparisonExpression.createNotLike(left, t, u);
+ }
+ |
+ <BETWEEN> low = addExpression() <AND> high = addExpression()
+ {
+ left = ComparisonExpression.createBetween(left, low, high);
+ }
+ |
+ LOOKAHEAD(2)
+ <NOT> <BETWEEN> low = addExpression() <AND> high = addExpression()
+ {
+ left = ComparisonExpression.createNotBetween(left, low, high);
+ }
+ |
+ <IN>
+ "("
+ t = stringLitteral()
+ {
+ list = new ArrayList();
+ list.add( t );
+ }
+ (
+ ","
+ t = stringLitteral()
+ {
+ list.add( t );
+ }
+
+ )*
+ ")"
+ {
+ left = ComparisonExpression.createInFilter(left, list);
+ }
+ |
+ LOOKAHEAD(2)
+ <NOT> <IN>
+ "("
+ t = stringLitteral()
+ {
+ list = new ArrayList();
+ list.add( t );
+ }
+ (
+ ","
+ t = stringLitteral()
+ {
+ list.add( t );
+ }
+
+ )*
+ ")"
+ {
+ left = ComparisonExpression.createNotInFilter(left, list);
+ }
+
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression addExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ left = multExpr()
+ (
+ LOOKAHEAD( ("+"|"-") multExpr())
+ (
+ "+" right = multExpr()
+ {
+ left = ArithmeticExpression.createPlus(left, right);
+ }
+ |
+ "-" right = multExpr()
+ {
+ left = ArithmeticExpression.createMinus(left, right);
+ }
+ )
+
+ )*
+ {
+ return left;
+ }
+}
+
+Expression multExpr() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ left = unaryExpr()
+ (
+ "*" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createMultiply(left, right);
+ }
+ |
+ "/" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createDivide(left, right);
+ }
+ |
+ "%" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createMod(left, right);
+ }
+
+ )*
+ {
+ return left;
+ }
+}
+
+
+Expression unaryExpr() :
+{
+ String s=null;
+ Expression left=null;
+}
+{
+ (
+ LOOKAHEAD( "+" unaryExpr() )
+ "+" left=unaryExpr()
+ |
+ "-" left=unaryExpr()
+ {
+ left = UnaryExpression.createNegate(left);
+ }
+ |
+ <NOT> left=unaryExpr()
+ {
+ left = UnaryExpression.createNOT( asBooleanExpression(left) );
+ }
+ |
+ <XPATH> s=stringLitteral()
+ {
+ left = UnaryExpression.createXPath( s );
+ }
+ |
+ <XQUERY> s=stringLitteral()
+ {
+ left = UnaryExpression.createXQuery( s );
+ }
+ |
+ left = primaryExpr()
+ )
+ {
+ return left;
+ }
+
+}
+
+Expression primaryExpr() :
+{
+ Expression left=null;
+}
+{
+ (
+ left = literal()
+ |
+ left = variable()
+ |
+ "(" left = orExpression() ")"
+ )
+ {
+ return left;
+ }
+}
+
+
+
+ConstantExpression literal() :
+{
+ Token t;
+ String s;
+ ConstantExpression left=null;
+}
+{
+ (
+ (
+ s = stringLitteral()
+ {
+ left = new ConstantExpression(s);
+ }
+ )
+ |
+ (
+ t = <DECIMAL_LITERAL>
+ {
+ left = ConstantExpression.createFromDecimal(t.image);
+ }
+ )
+ |
+ (
+ t = <HEX_LITERAL>
+ {
+ left = ConstantExpression.createFromHex(t.image);
+ }
+ )
+ |
+ (
+ t = <OCTAL_LITERAL>
+ {
+ left = ConstantExpression.createFromOctal(t.image);
+ }
+ )
+ |
+ (
+ t = <FLOATING_POINT_LITERAL>
+ {
+ left = ConstantExpression.createFloat(t.image);
+ }
+ )
+ |
+ (
+ <TRUE>
+ {
+ left = ConstantExpression.TRUE;
+ }
+ )
+ |
+ (
+ <FALSE>
+ {
+ left = ConstantExpression.FALSE;
+ }
+ )
+ |
+ (
+ <NULL>
+ {
+ left = ConstantExpression.NULL;
+ }
+ )
+ )
+ {
+ return left;
+ }
+}
+
+String stringLitteral() :
+{
+ Token t;
+ StringBuffer rc = new StringBuffer();
+ boolean first=true;
+}
+{
+ t = <STRING_LITERAL>
+ {
+ // Decode the sting value.
+ String image = t.image;
+ for( int i=1; i < image.length()-1; i++ ) {
+ char c = image.charAt(i);
+ if( c == '\'' )
+ i++;
+ rc.append(c);
+ }
+ return rc.toString();
+ }
+}
+
+PropertyExpression variable() :
+{
+ Token t;
+ PropertyExpression left=null;
+}
+{
+ (
+ t = <ID>
+ {
+ left = new PropertyExpression(t.image);
+ }
+ )
+ {
+ return left;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 63cc57b05e..24f61b2426 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -25,6 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; @@ -41,6 +43,8 @@ import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.TxnBuffer; import java.util.*; +import java.util.Set; +import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -102,6 +106,8 @@ public class AMQChannel private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); + private Set<Long> _browsedAcks = new HashSet<Long>(); + public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException { @@ -111,7 +117,7 @@ public class AMQChannel _messageStore = messageStore; _exchanges = exchanges; // by default the session is non-transactional - _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages); + _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages, _browsedAcks); } /** @@ -311,13 +317,14 @@ public class AMQChannel * @param tag the tag chosen by the client (if null, server will generate one) * @param queue the queue to subscribe to * @param session the protocol session of the subscriber + * @param noLocal * @return the consumer tag. This is returned to the subscriber and used in * subsequent unsubscribe requests * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ - public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) - throws AMQException, ConsumerTagNotUniqueException + public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, + FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -328,7 +335,7 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks); + queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal); _consumerTag2QueueMap.put(tag, queue); return tag; } @@ -524,6 +531,12 @@ public class AMQChannel return _unacknowledgedMessageMap; } + public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue) + { + _browsedAcks.add(deliveryTag); + addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + private void checkSuspension() { boolean suspend; diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java new file mode 100644 index 0000000000..c536f77dde --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java @@ -0,0 +1,219 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + +/** + * An expression which performs an operation on two expression values + * + * @version $Revision$ + */ +public abstract class ArithmeticExpression extends BinaryExpression { + + protected static final int INTEGER = 1; + protected static final int LONG = 2; + protected static final int DOUBLE = 3; + + /** + * @param left + * @param right + */ + public ArithmeticExpression(Expression left, Expression right) { + super(left, right); + } + + public static Expression createPlus(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof String) { + String text = (String) lvalue; + String answer = text + rvalue; + return answer; + } + else if (lvalue instanceof Number) { + return plus((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "+"; + } + }; + } + + public static Expression createMinus(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return minus((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "-"; + } + }; + } + + public static Expression createMultiply(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return multiply((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "*"; + } + }; + } + + public static Expression createDivide(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return divide((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "/"; + } + }; + } + + public static Expression createMod(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return mod((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "%"; + } + }; + } + + protected Number plus(Number left, Number right) { + switch (numberType(left, right)) { + case INTEGER: + return new Integer(left.intValue() + right.intValue()); + case LONG: + return new Long(left.longValue() + right.longValue()); + default: + return new Double(left.doubleValue() + right.doubleValue()); + } + } + + protected Number minus(Number left, Number right) { + switch (numberType(left, right)) { + case INTEGER: + return new Integer(left.intValue() - right.intValue()); + case LONG: + return new Long(left.longValue() - right.longValue()); + default: + return new Double(left.doubleValue() - right.doubleValue()); + } + } + + protected Number multiply(Number left, Number right) { + switch (numberType(left, right)) { + case INTEGER: + return new Integer(left.intValue() * right.intValue()); + case LONG: + return new Long(left.longValue() * right.longValue()); + default: + return new Double(left.doubleValue() * right.doubleValue()); + } + } + + protected Number divide(Number left, Number right) { + return new Double(left.doubleValue() / right.doubleValue()); + } + + protected Number mod(Number left, Number right) { + return new Double(left.doubleValue() % right.doubleValue()); + } + + private int numberType(Number left, Number right) { + if (isDouble(left) || isDouble(right)) { + return DOUBLE; + } + else if (left instanceof Long || right instanceof Long) { + return LONG; + } + else { + return INTEGER; + } + } + + private boolean isDouble(Number n) { + return n instanceof Float || n instanceof Double; + } + + protected Number asNumber(Object value) { + if (value instanceof Number) { + return (Number) value; + } + else { + throw new RuntimeException("Cannot convert value: " + value + " into a number"); + } + } + + public Object evaluate(AMQMessage message) throws AMQException + { + Object lvalue = left.evaluate(message); + if (lvalue == null) { + return null; + } + Object rvalue = right.evaluate(message); + if (rvalue == null) { + return null; + } + return evaluate(lvalue, rvalue); + } + + + /** + * @param lvalue + * @param rvalue + * @return + */ + abstract protected Object evaluate(Object lvalue, Object rvalue); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java new file mode 100644 index 0000000000..4256ab9189 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java @@ -0,0 +1,100 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + + + +/** + * An expression which performs an operation on two expression values. + * + * @version $Revision$ + */ +abstract public class BinaryExpression implements Expression { + protected Expression left; + protected Expression right; + + public BinaryExpression(Expression left, Expression right) { + this.left = left; + this.right = right; + } + + public Expression getLeft() { + return left; + } + + public Expression getRight() { + return right; + } + + + /** + * @see java.lang.Object#toString() + */ + public String toString() { + return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + abstract public String getExpressionSymbol(); + + /** + * @param expression + */ + public void setRight(Expression expression) { + right = expression; + } + + /** + * @param expression + */ + public void setLeft(Expression expression) { + left = expression; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java new file mode 100644 index 0000000000..de71e95049 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java @@ -0,0 +1,47 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + + +/** + * A BooleanExpression is an expression that always + * produces a Boolean result. + * + * @version $Revision$ + */ +public interface BooleanExpression extends Expression +{ + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + * @throws JMSException + */ + public boolean matches(AMQMessage message) throws AMQException; + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java new file mode 100644 index 0000000000..07391098ce --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java @@ -0,0 +1,467 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; + +import java.util.HashSet; +import java.util.List; +import java.util.regex.Pattern; + +import javax.jms.JMSException; + +/** + * A filter performing a comparison of two objects + * + * @version $Revision$ + */ +public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression { + + public static BooleanExpression createBetween(Expression value, Expression left, Expression right) { + return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right)); + } + + public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) { + return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right)); + } + + static final private HashSet REGEXP_CONTROL_CHARS = new HashSet(); + + static { + REGEXP_CONTROL_CHARS.add(new Character('.')); + REGEXP_CONTROL_CHARS.add(new Character('\\')); + REGEXP_CONTROL_CHARS.add(new Character('[')); + REGEXP_CONTROL_CHARS.add(new Character(']')); + REGEXP_CONTROL_CHARS.add(new Character('^')); + REGEXP_CONTROL_CHARS.add(new Character('$')); + REGEXP_CONTROL_CHARS.add(new Character('?')); + REGEXP_CONTROL_CHARS.add(new Character('*')); + REGEXP_CONTROL_CHARS.add(new Character('+')); + REGEXP_CONTROL_CHARS.add(new Character('{')); + REGEXP_CONTROL_CHARS.add(new Character('}')); + REGEXP_CONTROL_CHARS.add(new Character('|')); + REGEXP_CONTROL_CHARS.add(new Character('(')); + REGEXP_CONTROL_CHARS.add(new Character(')')); + REGEXP_CONTROL_CHARS.add(new Character(':')); + REGEXP_CONTROL_CHARS.add(new Character('&')); + REGEXP_CONTROL_CHARS.add(new Character('<')); + REGEXP_CONTROL_CHARS.add(new Character('>')); + REGEXP_CONTROL_CHARS.add(new Character('=')); + REGEXP_CONTROL_CHARS.add(new Character('!')); + } + + static class LikeExpression extends UnaryExpression implements BooleanExpression { + + Pattern likePattern; + + /** + * @param right + */ + public LikeExpression(Expression right, String like, int escape) { + super(right); + + StringBuffer regexp = new StringBuffer(like.length() * 2); + regexp.append("\\A"); // The beginning of the input + for (int i = 0; i < like.length(); i++) { + char c = like.charAt(i); + if (escape == (0xFFFF & c)) { + i++; + if (i >= like.length()) { + // nothing left to escape... + break; + } + + char t = like.charAt(i); + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & t)); + } + else if (c == '%') { + regexp.append(".*?"); // Do a non-greedy match + } + else if (c == '_') { + regexp.append("."); // match one + } + else if (REGEXP_CONTROL_CHARS.contains(new Character(c))) { + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & c)); + } + else { + regexp.append(c); + } + } + regexp.append("\\z"); // The end of the input + + likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL); + } + + /** + * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol() + */ + public String getExpressionSymbol() { + return "LIKE"; + } + + /** + * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) + */ + public Object evaluate(AMQMessage message) throws AMQException { + + Object rv = this.getRight().evaluate(message); + + if (rv == null) { + return null; + } + + if (!(rv instanceof String)) { + return Boolean.FALSE; + //throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass()); + } + + return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; + } + + public boolean matches(AMQMessage message) throws AMQException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + } + + public static BooleanExpression createLike(Expression left, String right, String escape) { + if (escape != null && escape.length() != 1) { + throw new RuntimeException("The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape); + } + int c = -1; + if (escape != null) { + c = 0xFFFF & escape.charAt(0); + } + + return new LikeExpression(left, right, c); + } + + public static BooleanExpression createNotLike(Expression left, String right, String escape) { + return UnaryExpression.createNOT(createLike(left, right, escape)); + } + + public static BooleanExpression createInFilter(Expression left, List elements) { + + if( !(left instanceof PropertyExpression) ) + throw new RuntimeException("Expected a property for In expression, got: "+left); + return UnaryExpression.createInExpression((PropertyExpression)left, elements, false); + + } + + public static BooleanExpression createNotInFilter(Expression left, List elements) { + + if( !(left instanceof PropertyExpression) ) + throw new RuntimeException("Expected a property for In expression, got: "+left); + return UnaryExpression.createInExpression((PropertyExpression)left, elements, true); + + } + + public static BooleanExpression createIsNull(Expression left) { + return doCreateEqual(left, ConstantExpression.NULL); + } + + public static BooleanExpression createIsNotNull(Expression left) { + return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL)); + } + + public static BooleanExpression createNotEqual(Expression left, Expression right) { + return UnaryExpression.createNOT(createEqual(left, right)); + } + + public static BooleanExpression createEqual(Expression left, Expression right) { + checkEqualOperand(left); + checkEqualOperand(right); + checkEqualOperandCompatability(left, right); + return doCreateEqual(left, right); + } + + private static BooleanExpression doCreateEqual(Expression left, Expression right) { + return new ComparisonExpression(left, right) { + + public Object evaluate(AMQMessage message) throws AMQException { + Object lv = left.evaluate(message); + Object rv = right.evaluate(message); + + // Iff one of the values is null + if (lv == null ^ rv == null) { + return Boolean.FALSE; + } + if (lv == rv || lv.equals(rv)) { + return Boolean.TRUE; + } + if( lv instanceof Comparable && rv instanceof Comparable ) { + return compare((Comparable)lv, (Comparable)rv); + } + return Boolean.FALSE; + } + + protected boolean asBoolean(int answer) { + return answer == 0; + } + + public String getExpressionSymbol() { + return "="; + } + }; + } + + public static BooleanExpression createGreaterThan(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + protected boolean asBoolean(int answer) { + return answer > 0; + } + + public String getExpressionSymbol() { + return ">"; + } + }; + } + + public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + protected boolean asBoolean(int answer) { + return answer >= 0; + } + + public String getExpressionSymbol() { + return ">="; + } + }; + } + + public static BooleanExpression createLessThan(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + + protected boolean asBoolean(int answer) { + return answer < 0; + } + + public String getExpressionSymbol() { + return "<"; + } + + }; + } + + public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + + protected boolean asBoolean(int answer) { + return answer <= 0; + } + + public String getExpressionSymbol() { + return "<="; + } + }; + } + + /** + * Only Numeric expressions can be used in >, >=, < or <= expressions.s + * + * @param expr + */ + public static void checkLessThanOperand(Expression expr ) { + if( expr instanceof ConstantExpression ) { + Object value = ((ConstantExpression)expr).getValue(); + if( value instanceof Number ) + return; + + // Else it's boolean or a String.. + throw new RuntimeException("Value '"+expr+"' cannot be compared."); + } + if( expr instanceof BooleanExpression ) { + throw new RuntimeException("Value '"+expr+"' cannot be compared."); + } + } + + /** + * Validates that the expression can be used in == or <> expression. + * Cannot not be NULL TRUE or FALSE litterals. + * + * @param expr + */ + public static void checkEqualOperand(Expression expr ) { + if( expr instanceof ConstantExpression ) { + Object value = ((ConstantExpression)expr).getValue(); + if( value == null ) + throw new RuntimeException("'"+expr+"' cannot be compared."); + } + } + + /** + * + * @param left + * @param right + */ + private static void checkEqualOperandCompatability(Expression left, Expression right) { + if( left instanceof ConstantExpression && right instanceof ConstantExpression ) { + if( left instanceof BooleanExpression && !(right instanceof BooleanExpression) ) + throw new RuntimeException("'"+left+"' cannot be compared with '"+right+"'"); + } + } + + + + /** + * @param left + * @param right + */ + public ComparisonExpression(Expression left, Expression right) { + super(left, right); + } + + public Object evaluate(AMQMessage message) throws AMQException + { + Comparable lv = (Comparable) left.evaluate(message); + if (lv == null) { + return null; + } + Comparable rv = (Comparable) right.evaluate(message); + if (rv == null) { + return null; + } + return compare(lv, rv); + } + + protected Boolean compare(Comparable lv, Comparable rv) { + Class lc = lv.getClass(); + Class rc = rv.getClass(); + // If the the objects are not of the same type, + // try to convert up to allow the comparison. + if (lc != rc) { + if (lc == Byte.class) { + if (rc == Short.class) { + lv = new Short(((Number) lv).shortValue()); + } + else if (rc == Integer.class) { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } else if (lc == Short.class) { + if (rc == Integer.class) { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } else if (lc == Integer.class) { + if (rc == Long.class) { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else if (lc == Long.class) { + if (rc == Integer.class) { + rv = new Long(((Number) rv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else if (lc == Float.class) { + if (rc == Integer.class) { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Long.class) { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else if (lc == Double.class) { + if (rc == Integer.class) { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Long.class) { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Float.class) { + rv = new Float(((Number) rv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else + return Boolean.FALSE; + } + return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE; + } + + protected abstract boolean asBoolean(int answer); + + public boolean matches(AMQMessage message) throws AMQException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java new file mode 100644 index 0000000000..2cd305d4b1 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java @@ -0,0 +1,201 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; + +import java.math.BigDecimal; + +import javax.jms.JMSException; + +/** + * Represents a constant expression + * + * @version $Revision$ + */ +public class ConstantExpression implements Expression +{ + + static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression + { + public BooleanConstantExpression(Object value) + { + super(value); + } + + public boolean matches(AMQMessage message) throws AMQException + { + Object object = evaluate(message); + return object != null && object == Boolean.TRUE; + } + } + + public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null); + public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE); + public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE); + + private Object value; + + public static ConstantExpression createFromDecimal(String text) + { + + // Strip off the 'l' or 'L' if needed. + if (text.endsWith("l") || text.endsWith("L")) + { + text = text.substring(0, text.length() - 1); + } + + Number value; + try + { + value = new Long(text); + } + catch (NumberFormatException e) + { + // The number may be too big to fit in a long. + value = new BigDecimal(text); + } + + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) + { + value = new Integer(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFromHex(String text) + { + Number value = new Long(Long.parseLong(text.substring(2), 16)); + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) + { + value = new Integer(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFromOctal(String text) + { + Number value = new Long(Long.parseLong(text, 8)); + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) + { + value = new Integer(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFloat(String text) + { + Number value = new Double(text); + return new ConstantExpression(value); + } + + public ConstantExpression(Object value) + { + this.value = value; + } + + public Object evaluate(AMQMessage message) throws AMQException + { + return value; + } + + public Object getValue() + { + return value; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + if (value == null) + { + return "NULL"; + } + if (value instanceof Boolean) + { + return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE"; + } + if (value instanceof String) + { + return encodeString((String) value); + } + return value.toString(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if (o == null || !this.getClass().equals(o.getClass())) + { + return false; + } + return toString().equals(o.toString()); + + } + + + /** + * Encodes the value of string so that it looks like it would look like + * when it was provided in a selector. + * + * @param s + * @return + */ + public static String encodeString(String s) + { + StringBuffer b = new StringBuffer(); + b.append('\''); + for (int i = 0; i < s.length(); i++) + { + char c = s.charAt(i); + if (c == '\'') + { + b.append(c); + } + b.append(c); + } + b.append('\''); + return b.toString(); + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java new file mode 100644 index 0000000000..3b5debd3ee --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java @@ -0,0 +1,44 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + + +/** + * Represents an expression + * + * @version $Revision$ + */ +public interface Expression +{ + + /** + * @return the value of this expression + */ + public Object evaluate(AMQMessage message) throws AMQException; + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java new file mode 100644 index 0000000000..c82de9fa15 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpid.server.queue.AMQMessage; + +public interface FilterManager +{ + void add(MessageFilter filter); + + void remove(MessageFilter filter); + + boolean allAllow(AMQMessage msg); + + boolean hasFilters(); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java new file mode 100644 index 0000000000..49f99132ef --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import java.util.Iterator; + + +public class FilterManagerFactory +{ + //private final static Logger _logger = LoggerFactory.getLogger(FilterManagerFactory.class); + private final static org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(FilterManagerFactory.class); + + //fixme move to a common class so it can be refered to from client code. + + public static FilterManager createManager(FieldTable filters) throws AMQException + { + FilterManager manager = null; + + if (filters != null) + { + + manager = new SimpleFilterManager(); + + Iterator it = filters.keySet().iterator(); + _logger.info("Processing filters:"); + while (it.hasNext()) + { + String key = (String) it.next(); + _logger.info("filter:" + key); + if (key.equals(AMQPFilterTypes.JMS_SELECTOR.getValue())) + { + String selector = (String) filters.get(key); + + if (selector != null && !selector.equals("")) + { + manager.add(new JMSSelectorFilter(selector)); + } + } + + if (key.equals(AMQPFilterTypes.NO_CONSUME.getValue())) + { + manager.add(new NoConsumerFilter()); + } + + } + + //If we added no filters don't bear the overhead of having an filter manager + if (!manager.hasFilters()) + { + manager = null; + } + } + else + { + _logger.info("No Filters found."); + } + + + return manager; + + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java new file mode 100644 index 0000000000..5f505fbeba --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.filter.jms.selector.SelectorParser; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.log4j.Logger; + + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +public class JMSSelectorFilter implements MessageFilter +{ + private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class); + + private String _selector; + private BooleanExpression _matcher; + + public JMSSelectorFilter(String selector) throws AMQException + { + _selector = selector; + _logger.info("Created JMSSelectorFilter with selector:" + _selector); + + + try + { + _matcher = new SelectorParser().parse(selector); + } + catch (InvalidSelectorException e) + { + // fixme + // Is this the correct way of throwing exception + throw new AMQInvalidSelectorException(e.getMessage()); + } + + } + + public boolean matches(AMQMessage message) + { + try + { + boolean match = _matcher.matches(message); + _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector); + return match; + } + catch (AMQException e) + { + //fixme this needs to be sorted.. it shouldn't happen + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + return false; + } + + public String getSelector() + { + return _selector; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java new file mode 100644 index 0000000000..e6ad98cb8b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java @@ -0,0 +1,96 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + +/** + * A filter performing a comparison of two objects + * + * @version $Revision$ + */ +public abstract class LogicExpression extends BinaryExpression implements BooleanExpression { + + public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) { + return new LogicExpression(lvalue, rvalue) { + + public Object evaluate(AMQMessage message) throws AMQException { + + Boolean lv = (Boolean) left.evaluate(message); + // Can we do an OR shortcut?? + if (lv !=null && lv.booleanValue()) { + return Boolean.TRUE; + } + + Boolean rv = (Boolean) right.evaluate(message); + return rv==null ? null : rv; + } + + public String getExpressionSymbol() { + return "OR"; + } + }; + } + + public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) { + return new LogicExpression(lvalue, rvalue) { + + public Object evaluate(AMQMessage message) throws AMQException { + + Boolean lv = (Boolean) left.evaluate(message); + + // Can we do an AND shortcut?? + if (lv == null) + return null; + if (!lv.booleanValue()) { + return Boolean.FALSE; + } + + Boolean rv = (Boolean) right.evaluate(message); + return rv == null ? null : rv; + } + + public String getExpressionSymbol() { + return "AND"; + } + }; + } + + /** + * @param left + * @param right + */ + public LogicExpression(BooleanExpression left, BooleanExpression right) { + super(left, right); + } + + abstract public Object evaluate(AMQMessage message) throws AMQException; + + public boolean matches(AMQMessage message) throws AMQException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java new file mode 100644 index 0000000000..b8ca75d209 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; + +import javax.jms.JMSException; + +public interface MessageFilter +{ + boolean matches(AMQMessage message) throws JMSException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java new file mode 100644 index 0000000000..5c58b73ea3 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +public class NoConsumerFilter implements MessageFilter +{ + private final static Logger _logger = org.apache.log4j.Logger.getLogger(NoConsumerFilter.class); + + + public NoConsumerFilter() throws AMQException + { + _logger.info("Created NoConsumerFilter"); + } + + public boolean matches(AMQMessage message) + { + return true; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java new file mode 100644 index 0000000000..7d6a98df84 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -0,0 +1,305 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import java.io.IOException; +import java.util.HashMap; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; + +//import org.apache.activemq.command.ActiveMQDestination; +//import org.apache.activemq.command.Message; +//import org.apache.activemq.command.TransactionId; +//import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.log4j.Logger; + +/** + * Represents a property expression + * + * @version $Revision$ + */ +public class PropertyExpression implements Expression +{ + + interface SubExpression + { + public Object evaluate(AMQMessage message) throws AMQException; + } + + interface JMSExpression + { + public abstract Object evaluate(JMSMessage message); + } + + static class SubJMSExpression implements SubExpression + { + JMSExpression _expression; + + SubJMSExpression(JMSExpression expression) + { + _expression = expression; + } + + + public Object evaluate(AMQMessage message) throws AMQException + { + JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE); + if (msg != null) + { + return _expression.evaluate(msg); + } + else + { + return null; + } + } + } + + private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class); + + + static final private HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap(); + + static + { + JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSDestination(); + } + } + )); +// +// public Object evaluate(AMQMessage message) +// { +// //fixme +// +// +//// AMQDestination dest = message.getOriginalDestination(); +//// if (dest == null) +//// { +//// dest = message.getDestination(); +//// } +//// if (dest == null) +//// { +//// return null; +//// } +//// return dest.toString(); +// return ""; +// } +// }); + JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSReplyTo(); + } + }) + ); + + JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSType(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + try + { + Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + _logger.info("JMSDeliveryMode is :" + mode); + return mode; + } + catch (AMQException e) + { + //shouldn't happen + } + + return DeliveryMode.NON_PERSISTENT; + } + })); + + JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSPriority(); + } + } + )); + + + JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getAMQMessage().getMessageId(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSTimestamp(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSCorrelationID(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSExpiration(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getAMQMessage().isRedelivered(); + } + } + )); + + } + + private final String name; + private final SubExpression jmsPropertyExpression; + + public PropertyExpression(String name) + { + this.name = name; + jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name); + } + + public Object evaluate(AMQMessage message) throws AMQException + { +// try +// { +// if (message.isDropped()) +// { +// return null; +// } + + if (jmsPropertyExpression != null) + { + return jmsPropertyExpression.evaluate(message); + } +// try + else + { + + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + + _logger.info("Looking up property:" + name); + _logger.info("Properties are:" + _properties.getHeaders().keySet()); + + return _properties.getHeaders().get(name); + } +// catch (IOException ioe) +// { +// JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage()); +// exception.initCause(ioe); +// throw exception; +// } +// } +// catch (IOException e) +// { +// JMSException exception = new JMSException(e.getMessage()); +// exception.initCause(e); +// throw exception; +// } + + } + + public String getName() + { + return name; + } + + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + return name; + } + + /** + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return name.hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if (o == null || !this.getClass().equals(o.getClass())) + { + return false; + } + return name.equals(((PropertyExpression) o).name); + + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java new file mode 100644 index 0000000000..dc2c2c0e6c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class SimpleFilterManager implements FilterManager +{ + private final Logger _logger = LoggerFactory.getLogger(SimpleFilterManager.class); + + private final ConcurrentLinkedQueue<MessageFilter> _filters; + + public SimpleFilterManager() + { + _logger.debug("Creating SimpleFilterManager"); + _filters = new ConcurrentLinkedQueue<MessageFilter>(); + } + + public void add(MessageFilter filter) + { + _filters.add(filter); + } + + public void remove(MessageFilter filter) + { + _filters.remove(filter); + } + + public boolean allAllow(AMQMessage msg) + { + for (MessageFilter filter : _filters) + { + try + { + if (!filter.matches(msg)) + { + return false; + } + } + catch (JMSException e) + { + //fixme + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + return false; + } + } + return true; + } + + public boolean hasFilters() + { + return !_filters.isEmpty(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java new file mode 100644 index 0000000000..abc56f04d0 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java @@ -0,0 +1,265 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import javax.jms.JMSException; + +/** + * An expression which performs an operation on two expression values + * + * @version $Revision$ + */ +public abstract class UnaryExpression implements Expression { + + private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE); + protected Expression right; + + public static Expression createNegate(Expression left) { + return new UnaryExpression(left) { + public Object evaluate(AMQMessage message) throws AMQException + { + Object rvalue = right.evaluate(message); + if (rvalue == null) { + return null; + } + if (rvalue instanceof Number) { + return negate((Number) rvalue); + } + return null; + } + + public String getExpressionSymbol() { + return "-"; + } + }; + } + + public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) { + + // Use a HashSet if there are many elements. + Collection t; + if( elements.size()==0 ) + t=null; + else if( elements.size() < 5 ) + t = elements; + else { + t = new HashSet(elements); + } + final Collection inList = t; + + return new BooleanUnaryExpression(right) { + public Object evaluate(AMQMessage message) throws AMQException { + + Object rvalue = right.evaluate(message); + if (rvalue == null) { + return null; + } + if( rvalue.getClass()!=String.class ) + return null; + + if( (inList!=null && inList.contains(rvalue)) ^ not ) { + return Boolean.TRUE; + } else { + return Boolean.FALSE; + } + + } + + public String toString() { + StringBuffer answer = new StringBuffer(); + answer.append(right); + answer.append(" "); + answer.append(getExpressionSymbol()); + answer.append(" ( "); + + int count=0; + for (Iterator i = inList.iterator(); i.hasNext();) { + Object o = (Object) i.next(); + if( count!=0 ) { + answer.append(", "); + } + answer.append(o); + count++; + } + + answer.append(" )"); + return answer.toString(); + } + + public String getExpressionSymbol() { + if( not ) + return "NOT IN"; + else + return "IN"; + } + }; + } + + abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression { + public BooleanUnaryExpression(Expression left) { + super(left); + } + + public boolean matches(AMQMessage message) throws AMQException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + }; + + + public static BooleanExpression createNOT(BooleanExpression left) { + return new BooleanUnaryExpression(left) { + public Object evaluate(AMQMessage message) throws AMQException { + Boolean lvalue = (Boolean) right.evaluate(message); + if (lvalue == null) { + return null; + } + return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE; + } + + public String getExpressionSymbol() { + return "NOT"; + } + }; + } + + public static BooleanExpression createXPath(final String xpath) { + return new XPathExpression(xpath); + } + + public static BooleanExpression createXQuery(final String xpath) { + return new XQueryExpression(xpath); + } + + public static BooleanExpression createBooleanCast(Expression left) { + return new BooleanUnaryExpression(left) { + public Object evaluate(AMQMessage message) throws AMQException { + Object rvalue = right.evaluate(message); + if (rvalue == null) + return null; + if (!rvalue.getClass().equals(Boolean.class)) + return Boolean.FALSE; + return ((Boolean)rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE; + } + + public String toString() { + return right.toString(); + } + + public String getExpressionSymbol() { + return ""; + } + }; + } + + private static Number negate(Number left) { + Class clazz = left.getClass(); + if (clazz == Integer.class) { + return new Integer(-left.intValue()); + } + else if (clazz == Long.class) { + return new Long(-left.longValue()); + } + else if (clazz == Float.class) { + return new Float(-left.floatValue()); + } + else if (clazz == Double.class) { + return new Double(-left.doubleValue()); + } + else if (clazz == BigDecimal.class) { + // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the + // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it + // as a Big decimal. But it gets Negated right away.. to here we try to covert it back + // to a Long. + BigDecimal bd = (BigDecimal)left; + bd = bd.negate(); + + if( BD_LONG_MIN_VALUE.compareTo(bd)==0 ) { + return new Long(Long.MIN_VALUE); + } + return bd; + } + else { + throw new RuntimeException("Don't know how to negate: "+left); + } + } + + public UnaryExpression(Expression left) { + this.right = left; + } + + public Expression getRight() { + return right; + } + + public void setRight(Expression expression) { + right = expression; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() { + return "(" + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + abstract public String getExpressionSymbol(); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java new file mode 100644 index 0000000000..85402e0781 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java @@ -0,0 +1,132 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import javax.jms.JMSException; + +//import org.apache.activemq.command.Message; +//import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; + +/** + * Used to evaluate an XPath Expression in a JMS selector. + */ +public final class XPathExpression implements BooleanExpression { + + private static final Log log = LogFactory.getLog(XPathExpression.class); + private static final String EVALUATOR_SYSTEM_PROPERTY = "org.apache.qpid.server.filter.XPathEvaluatorClassName"; + private static final String DEFAULT_EVALUATOR_CLASS_NAME=XalanXPathEvaluator.class.getName(); + + private static final Constructor EVALUATOR_CONSTRUCTOR; + + static { + String cn = System.getProperty(EVALUATOR_SYSTEM_PROPERTY, DEFAULT_EVALUATOR_CLASS_NAME); + Constructor m = null; + try { + try { + m = getXPathEvaluatorConstructor(cn); + } catch (Throwable e) { + log.warn("Invalid "+XPathEvaluator.class.getName()+" implementation: "+cn+", reason: "+e,e); + cn = DEFAULT_EVALUATOR_CLASS_NAME; + try { + m = getXPathEvaluatorConstructor(cn); + } catch (Throwable e2) { + log.error("Default XPath evaluator could not be loaded",e); + } + } + } finally { + EVALUATOR_CONSTRUCTOR = m; + } + } + + private static Constructor getXPathEvaluatorConstructor(String cn) throws ClassNotFoundException, SecurityException, NoSuchMethodException { + Class c = XPathExpression.class.getClassLoader().loadClass(cn); + if( !XPathEvaluator.class.isAssignableFrom(c) ) { + throw new ClassCastException(""+c+" is not an instance of "+XPathEvaluator.class); + } + return c.getConstructor(new Class[]{String.class}); + } + + private final String xpath; + private final XPathEvaluator evaluator; + + static public interface XPathEvaluator { + public boolean evaluate(AMQMessage message) throws AMQException; + } + + XPathExpression(String xpath) { + this.xpath = xpath; + this.evaluator = createEvaluator(xpath); + } + + private XPathEvaluator createEvaluator(String xpath2) { + try { + return (XPathEvaluator)EVALUATOR_CONSTRUCTOR.newInstance(new Object[]{xpath}); + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if( cause instanceof RuntimeException ) { + throw (RuntimeException)cause; + } + throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e); + } catch (Throwable e) { + throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e); + } + } + + public Object evaluate(AMQMessage message) throws AMQException { +// try { +//FIXME this is flow to disk work +// if( message.isDropped() ) +// return null; + return evaluator.evaluate(message) ? Boolean.TRUE : Boolean.FALSE; +// } catch (IOException e) { +// +// JMSException exception = new JMSException(e.getMessage()); +// exception.initCause(e); +// throw exception; +// +// } + + } + + public String toString() { + return "XPATH "+ConstantExpression.encodeString(xpath); + } + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + * @throws JMSException + */ + public boolean matches(AMQMessage message) throws AMQException + { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java new file mode 100644 index 0000000000..da8a61650a --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java @@ -0,0 +1,58 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +/** + * Used to evaluate an XQuery Expression in a JMS selector. + */ +public final class XQueryExpression implements BooleanExpression { + private final String xpath; + + XQueryExpression(String xpath) { + super(); + this.xpath = xpath; + } + + public Object evaluate(AMQMessage message) throws AMQException { + return Boolean.FALSE; + } + + public String toString() { + return "XQUERY "+ConstantExpression.encodeString(xpath); + } + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + * @throws JMSException + */ + public boolean matches(AMQMessage message) throws AMQException + { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java new file mode 100644 index 0000000000..f74e0cedec --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import java.io.StringReader; +import java.io.ByteArrayInputStream; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.TextMessage; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +//import org.apache.activemq.command.Message; +//import org.apache.activemq.util.ByteArrayInputStream; +import org.apache.xpath.CachedXPathAPI; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; +import org.w3c.dom.Document; +import org.w3c.dom.traversal.NodeIterator; +import org.xml.sax.InputSource; + +public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator { + + private final String xpath; + + public XalanXPathEvaluator(String xpath) { + this.xpath = xpath; + } + + public boolean evaluate(AMQMessage m) throws AMQException + { + try + { + + if( m instanceof TextMessage ) { + String text = ((TextMessage)m).getText(); + return evaluate(text); + } else if ( m instanceof BytesMessage ) { + BytesMessage bm = (BytesMessage) m; + byte data[] = new byte[(int) bm.getBodyLength()]; + bm.readBytes(data); + return evaluate(data); + } + return false; + } + catch (JMSException e) + { + throw new AMQException("Error evaluting message: " + e, e); + } + } + + private boolean evaluate(byte[] data) { + try { + + InputSource inputSource = new InputSource(new ByteArrayInputStream(data)); + + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(true); + DocumentBuilder dbuilder = factory.newDocumentBuilder(); + Document doc = dbuilder.parse(inputSource); + + CachedXPathAPI cachedXPathAPI = new CachedXPathAPI(); + NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath); + return iterator.nextNode()!=null; + + } catch (Throwable e) { + return false; + } + } + + private boolean evaluate(String text) { + try { + InputSource inputSource = new InputSource(new StringReader(text)); + + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(true); + DocumentBuilder dbuilder = factory.newDocumentBuilder(); + Document doc = dbuilder.parse(inputSource); + + // We should associated the cachedXPathAPI object with the message being evaluated + // since that should speedup subsequent xpath expressions. + CachedXPathAPI cachedXPathAPI = new CachedXPathAPI(); + NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath); + return iterator.nextNode()!=null; + } catch (Throwable e) { + return false; + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 8b3ced9811..164094ac58 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -21,10 +21,12 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -32,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.log4j.Logger; @@ -74,8 +77,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } try { - String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck); - if(!body.nowait) + String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, + body.arguments, body.noLocal); + if (!body.nowait) { session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag)); } @@ -83,10 +87,19 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic //now allow queue to start async processing of any backlog of messages queue.deliverAsync(); } + catch (AMQInvalidSelectorException ise) + { + _log.info("Closing connection due to invalid selector"); + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(), + ise.getMessage(), BasicConsumeBody.CLASS_ID, + BasicConsumeBody.METHOD_ID)); + } catch (ConsumerTagNotUniqueException e) { String msg = "Non-unique consumer tag, '" + body.consumerTag + "'"; - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID)); + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, + BasicConsumeBody.CLASS_ID, + BasicConsumeBody.METHOD_ID)); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 00ae547683..79b2e11bca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -78,12 +78,19 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< AuthenticationResult authResult = authMgr.authenticate(ss, body.response); + //save clientProperties + if (protocolSession.getClientProperties() == null) + { + protocolSession.setClientProperties(body.clientProperties); + } + switch (authResult.status) { case ERROR: throw new AMQException("Authentication failed"); case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); + stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(), HeartbeatConfig.getInstance().getDelay()); @@ -122,7 +129,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< static int getConfiguredFrameSize() { final Configuration config = ApplicationRegistry.getInstance().getConfiguration(); - final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE); + final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE); _logger.info("Framesize set to " + framesize); return framesize; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java new file mode 100644 index 0000000000..aba3b88a59 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.message; + +public interface MessageDecorator +{ +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java new file mode 100644 index 0000000000..376f88cbf1 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.message.jms; + +import org.apache.qpid.server.message.MessageDecorator; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; + +import javax.jms.Message; +import javax.jms.JMSException; +import javax.jms.Destination; +import javax.jms.MessageNotWriteableException; +import java.util.Enumeration; + +public class JMSMessage implements MessageDecorator +{ + + private AMQMessage _message; + private BasicContentHeaderProperties _properties; + + public JMSMessage(AMQMessage message) throws AMQException + { + _message = message; + ContentHeaderBody contentHeader = message.getContentHeaderBody(); + _properties = (BasicContentHeaderProperties) contentHeader.properties; + } + + protected void checkWriteable() throws MessageNotWriteableException + { + //The broker should not modify a message. +// if (_readableMessage) + { + throw new MessageNotWriteableException("The broker should not modify a message."); + } + } + + + public String getJMSMessageID() + { + return _properties.getMessageId(); + } + + public void setJMSMessageID(String string) throws MessageNotWriteableException + { + checkWriteable(); + _properties.setMessageId(string); + } + + public long getJMSTimestamp() + { + return _properties.getTimestamp(); + } + + public void setJMSTimestamp(long l) throws MessageNotWriteableException + { + checkWriteable(); + _properties.setTimestamp(l); + } + + public byte[] getJMSCorrelationIDAsBytes() + { + return _properties.getCorrelationId().getBytes(); + } + +// public void setJMSCorrelationIDAsBytes(byte[] bytes) +// { +// } + + public void setJMSCorrelationID(String string) throws MessageNotWriteableException + { + checkWriteable(); + _properties.setCorrelationId(string); + } + + public String getJMSCorrelationID() + { + return _properties.getCorrelationId(); + } + + public String getJMSReplyTo() + { + return _properties.getReplyTo(); + } + + public void setJMSReplyTo(Destination destination) throws MessageNotWriteableException + { + checkWriteable(); + _properties.setReplyTo(destination.toString()); + } + + public String getJMSDestination() + { + //fixme should be a deestination + return ""; + } + + public void setJMSDestination(Destination destination) throws MessageNotWriteableException + { + checkWriteable(); + //_properties.setDestination(destination.toString()); + } + + public int getJMSDeliveryMode() + { + return _properties.getDeliveryMode(); + } + + public void setJMSDeliveryMode(byte i) throws MessageNotWriteableException + { + checkWriteable(); + _properties.setDeliveryMode(i); + } + + public boolean getJMSRedelivered() + { + return _message.isRedelivered(); + } + + public void setJMSRedelivered(boolean b) throws MessageNotWriteableException + { + checkWriteable(); + _message.setRedelivered(b); + } + + public String getJMSType() + { + return _properties.getType(); + } + + public void setJMSType(String string) throws MessageNotWriteableException + { + checkWriteable(); + _properties.setType(string); + } + + public long getJMSExpiration() + { + return _properties.getExpiration(); + } + + public void setJMSExpiration(long l) throws MessageNotWriteableException + { + checkWriteable(); + _properties.setExpiration(l); + } + + public int getJMSPriority() + { + return _properties.getPriority(); + } + + public void setJMSPriority(byte i) throws MessageNotWriteableException + { + checkWriteable(); + _properties.setPriority(i); + } + + public void clearProperties() throws MessageNotWriteableException + { + checkWriteable(); + _properties.getJMSHeaders().clear(); + } + + public boolean propertyExists(String string) + { + return _properties.getJMSHeaders().propertyExists(string); + } + + public boolean getBooleanProperty(String string) throws JMSException + { + return _properties.getJMSHeaders().getBoolean(string); + } + + public byte getByteProperty(String string) throws JMSException + { + return _properties.getJMSHeaders().getByte(string); + } + + public short getShortProperty(String string) throws JMSException + { + return _properties.getJMSHeaders().getShort(string); + } + + public int getIntProperty(String string) throws JMSException + { + return _properties.getJMSHeaders().getInteger(string); + } + + public long getLongProperty(String string) throws JMSException + { + return _properties.getJMSHeaders().getLong(string); + } + + public float getFloatProperty(String string) throws JMSException + { + return _properties.getJMSHeaders().getFloat(string); + } + + public double getDoubleProperty(String string) throws JMSException + { + return _properties.getJMSHeaders().getDouble(string); + } + + public String getStringProperty(String string) throws JMSException + { + return _properties.getJMSHeaders().getString(string); + } + + public Object getObjectProperty(String string) throws JMSException + { + return _properties.getJMSHeaders().getObject(string); + } + + public Enumeration getPropertyNames() + { + return _properties.getJMSHeaders().getPropertyNames(); + } + + public void setBooleanProperty(String string, boolean b) throws JMSException + { + checkWriteable(); + _properties.getJMSHeaders().setBoolean(string, b); + } + + public void setByteProperty(String string, byte b) throws JMSException + { + checkWriteable(); + _properties.getJMSHeaders().setByte(string, b); + } + + public void setShortProperty(String string, short i) throws JMSException + { + checkWriteable(); + _properties.getJMSHeaders().setShort(string, i); + } + + public void setIntProperty(String string, int i) throws JMSException + { + checkWriteable(); + _properties.getJMSHeaders().setInteger(string, i); + } + + public void setLongProperty(String string, long l) throws JMSException + { + checkWriteable(); + _properties.getJMSHeaders().setLong(string, l); + } + + public void setFloatProperty(String string, float v) throws JMSException + { + checkWriteable(); + _properties.getJMSHeaders().setFloat(string, v); + } + + public void setDoubleProperty(String string, double v) throws JMSException + { + checkWriteable(); + _properties.getJMSHeaders().setDouble(string, v); + } + + public void setStringProperty(String string, String string1) throws JMSException + { + checkWriteable(); + _properties.getJMSHeaders().setString(string, string1); + } + + public void setObjectProperty(String string, Object object) throws JMSException + { + checkWriteable(); + _properties.getJMSHeaders().setObject(string, object); + } + + public void acknowledge() throws MessageNotWriteableException + { + checkWriteable(); + } + + public void clearBody() throws MessageNotWriteableException + { + checkWriteable(); + } + + public AMQMessage getAMQMessage() + { + return _message; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 1832f01b7a..8f10a06fe4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -26,9 +26,19 @@ import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; -import org.apache.qpid.framing.*; + import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.management.Managable; @@ -84,6 +94,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /* AMQP Version for this session */ private byte _major; private byte _minor; + private FieldTable _clientProperties; public ManagedObject getManagedObject() { @@ -119,7 +130,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return new AMQProtocolSessionMBean(this); } - catch(JMException ex) + catch (JMException ex) { _logger.error("AMQProtocolSession MBean creation has failed ", ex); throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); @@ -144,7 +155,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { ProtocolInitiation pi = (ProtocolInitiation) message; // this ensures the codec never checks for a PI message again - ((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false); + ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); try { pi.checkVersion(this); // Fails if not correct @@ -153,7 +164,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _minor = pi.protocolMinor; String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); String locales = "en_US"; - AMQFrame response = ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor, pi.protocolMinor, null, + AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null, mechanisms.getBytes(), locales.getBytes()); _minaProtocolSession.write(response); } @@ -195,7 +206,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _logger.debug("Method frame received: " + frame); } final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, - (AMQMethodBody)frame.bodyFrame); + (AMQMethodBody) frame.bodyFrame); try { boolean wasAnyoneInterested = false; @@ -250,7 +261,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } - getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame); + getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame); } private void contentBodyReceived(AMQFrame frame) throws AMQException @@ -294,8 +305,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _channelMap.get(channelId); } - public void addChannel(AMQChannel channel) + public void addChannel(AMQChannel channel) throws AMQException { + if (_closed) + { + throw new AMQException("Session is closed"); + } + _channelMap.put(channel.getChannelId(), channel); checkForNotification(); } @@ -339,6 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * Close a specific channel. This will remove any resources used by the channel, including: * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li> * </ul> + * * @param channelId id of the channel to close * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid @@ -365,6 +382,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * In our current implementation this is used by the clustering code. + * * @param channelId */ public void removeChannel(int channelId) @@ -374,11 +392,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * Initialise heartbeats on the session. + * * @param delay delay in seconds (not ms) */ public void initHeartbeats(int delay) { - if(delay > 0) + if (delay > 0) { _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay)); @@ -388,6 +407,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * Closes all channels that were opened by this protocol session. This frees up all resources * used by the channel. + * * @throws AMQException if an error occurs while closing any channel */ private void closeAllChannels() throws AMQException @@ -396,6 +416,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { channel.close(this); } + _channelMap.clear(); } /** @@ -404,7 +425,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, */ public void closeSession() throws AMQException { - if(!_closed) + if (!_closed) { _closed = true; closeAllChannels(); @@ -446,11 +467,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // information is used by SASL primary. if (address instanceof InetSocketAddress) { - return ((InetSocketAddress)address).getHostName(); + return ((InetSocketAddress) address).getHostName(); } else if (address instanceof VmPipeAddress) { - return "vmpipe:" + ((VmPipeAddress)address).getPort(); + return "vmpipe:" + ((VmPipeAddress) address).getPort(); } else { @@ -468,6 +489,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _saslServer = saslServer; } + public FieldTable getClientProperties() + { + return _clientProperties; + } + + public void setClientProperties(FieldTable clientProperties) + { + _clientProperties = clientProperties; + } + /** * Convenience methods for managing AMQP version. * NOTE: Both major and minor will be set to 0 prior to protocol initiation. diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index acaf6b0d9b..a75627d240 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; @@ -69,7 +70,7 @@ public interface AMQProtocolSession * @param channel the channel to associate with this session. It is an error to * associate the same channel with more than one session but this is not validated. */ - void addChannel(AMQChannel channel); + void addChannel(AMQChannel channel) throws AMQException; /** * Close a specific channel. This will remove any resources used by the channel, including: @@ -122,4 +123,9 @@ public interface AMQProtocolSession * @param saslServer */ void setSaslServer(SaslServer saslServer); + + + FieldTable getClientProperties(); + + void setClientProperties(FieldTable clientProperties); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index a47d462810..d57f9b9be1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -18,6 +18,9 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -183,33 +186,17 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed } /** - * @see AMQMinaProtocolSession#closeChannel(int) - */ - public void closeChannel(int id) throws JMException - { - try - { - AMQChannel channel = _session.getChannel(id); - if (channel == null) - { - throw new JMException("The channel (channel Id = " + id + ") does not exist"); - } - - _session.closeChannel(id); - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - /** * closes the connection. The administrator can use this management operation to close connection to free up * resources. * @throws JMException */ public void closeConnection() throws JMException { + + final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(), + "Broker Management Console has closing the connection.", 0, 0); + _session.writeFrame(response); + try { _session.closeSession(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java index 2f3102b048..1a7b7e9e96 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java @@ -114,15 +114,6 @@ public interface ManagedConnection void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException; /** - * Unsubscribes the consumers and unregisters the channel from managed objects. - */ - @MBeanOperation(name="closeChannel", - description="Closes the channel with given channel Id and connected consumers will be unsubscribed", - impact= MBeanOperationInfo.ACTION) - void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) - throws Exception; - - /** * Closes all the related channels and unregisters this connection from managed objects. */ @MBeanOperation(name="closeConnection", diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 8f6377d80d..b7dcffe3cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -26,10 +26,14 @@ import org.apache.qpid.framing.*; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.message.MessageDecorator; +import org.apache.qpid.server.message.jms.JMSMessage; import org.apache.log4j.Logger; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentHashMap; /** * Combines the information that make up a deliverable message into a more manageable form. @@ -38,6 +42,8 @@ public class AMQMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); + public static final String JMS_MESSAGE = "jms.message"; + /** * Used in clustering */ @@ -64,6 +70,9 @@ public class AMQMessage */ private boolean _deliveredToConsumer; + private ConcurrentHashMap<String, MessageDecorator> _decodedMessages; + private AtomicBoolean _taken; + private TransientMessageData _transientMessageData = new TransientMessageData(); /** @@ -141,6 +150,8 @@ public class AMQMessage _messageId = messageId; _txnContext = txnContext; _transientMessageData.setPublishBody(publishBody); + _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); + _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { _log.debug("Message created with id " + messageId); @@ -358,6 +369,60 @@ public class AMQMessage return _publisher; } + /** + * Called selectors to determin if the message has already been sent + * @return _deliveredToConsumer + */ + public boolean getDeliveredToConsumer() + { + return _deliveredToConsumer; + } + + + public MessageDecorator getDecodedMessage(String type) throws AMQException + { + MessageDecorator msgtype = null; + + if (_decodedMessages != null) + { + msgtype = _decodedMessages.get(type); + + if (msgtype == null) + { + msgtype = decorateMessage(type); + } + } + + return msgtype; + } + + private MessageDecorator decorateMessage(String type) throws AMQException + { + MessageDecorator msgdec = null; + + if (type.equals(JMS_MESSAGE)) + { + msgdec = new JMSMessage(this); + } + + if (msgdec != null) + { + _decodedMessages.put(type, msgdec); + } + + return msgdec; + } + + public boolean taken() + { + return _taken.getAndSet(true); + } + + public void release() + { + _taken.set(false); + } + public boolean checkToken(Object token) { if (_tokens.contains(token)) @@ -507,8 +572,7 @@ public class AMQMessage } // - // Now start writing out the other content bodies - // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded + // Now start writing out the other content bodies // while (bodyFrameIterator.hasNext()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 5f6d4c2939..7ab48598c4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -96,7 +96,7 @@ public class AMQQueue implements Managable, Comparable * max allowed number of messages on a queue. */ private Integer _maxMessageCount = 10000; - + /** * max queue depth(KB) for the queue */ @@ -188,16 +188,29 @@ public class AMQQueue implements Managable, Comparable _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; - //fixme - Pick one. - if (Boolean.getBoolean("concurrentdeliverymanager")) + //fixme - Make this configurable via the broker config.xml + if (System.getProperties().getProperty("deliverymanager") != null) { - _logger.info("Using ConcurrentDeliveryManager"); - _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); + if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager")) + { + _logger.info("Using ConcurrentSelectorDeliveryManager"); + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + } + else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager")) + { + _logger.info("Using ConcurrentDeliveryManager"); + _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); + } + else + { + _logger.info("Using SynchronizedDeliveryManager"); + _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); + } } else { - _logger.info("Using SynchronizedDeliveryManager"); - _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); + _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager"); + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); } } @@ -349,12 +362,26 @@ public class AMQQueue implements Managable, Comparable _bindings.addBinding(routingKey, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks) + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException + { + registerProtocolSession(ps, channel, consumerTag, acks, filters, false); + } + + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); - Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks); + Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); + + if(subscription.hasFilters()) + { + if (_deliveryMgr.hasQueuedMessages()) + { + _deliveryMgr.populatePreDeliveryQueue(subscription); + } + } + _subscribers.addSubscriber(subscription); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java index 4f6173fa2a..dda074aca7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java @@ -195,6 +195,11 @@ public class ConcurrentDeliveryManager implements DeliveryManager return new ArrayList<AMQMessage>(_messages); } + public void populatePreDeliveryQueue(Subscription subscription) + { + //no-op . This DM has no PreDeliveryQueues + } + public synchronized void removeAMessageFromTop() throws AMQException { AMQMessage msg = poll(); @@ -309,7 +314,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager else { s.send(msg, _queue); - msg.setDeliveredToConsumer(); } } finally diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java new file mode 100644 index 0000000000..23e2754eb2 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -0,0 +1,388 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; +import org.apache.qpid.configuration.Configured; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.configuration.Configurator; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * Manages delivery of messages on behalf of a queue + */ +public class ConcurrentSelectorDeliveryManager implements DeliveryManager +{ + private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class); + + @Configured(path = "advanced.compressBufferOnQueue", + defaultValue = "false") + public boolean compressBufferOnQueue; + /** + * Holds any queued messages + */ + private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + //private int _messageCount; + /** + * Ensures that only one asynchronous task is running for this manager at + * any time. + */ + private final AtomicBoolean _processing = new AtomicBoolean(); + /** + * The subscriptions on the queue to whom messages are delivered + */ + private final SubscriptionManager _subscriptions; + + /** + * A reference to the queue we are delivering messages for. We need this to be able + * to pass the code that handles acknowledgements a handle on the queue. + */ + private final AMQQueue _queue; + + + /** + * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced + * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered + * via the async thread. + * <p/> + * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. + */ + private ReentrantLock _lock = new ReentrantLock(); + + + ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) + { + + //Set values from configuration + Configurator.configure(this); + + if (compressBufferOnQueue) + { + _log.warn("Compressing Buffers on queue."); + } + + _subscriptions = subscriptions; + _queue = queue; + } + + + private boolean addMessageToQueue(AMQMessage msg) + { + // Shrink the ContentBodies to their actual size to save memory. + if (compressBufferOnQueue) + { + Iterator<ContentBody> it = msg.getContentBodyIterator(); + while (it.hasNext()) + { + ContentBody cb = it.next(); + cb.reduceBufferToFit(); + } + } + + _messages.offer(msg); + + return true; + } + + + public boolean hasQueuedMessages() + { + _lock.lock(); + try + { + return !_messages.isEmpty(); + } + finally + { + _lock.unlock(); + } + } + + public int getQueueMessageCount() + { + return getMessageCount(); + } + + /** + * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. + * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. + * + * @return int the number of messages in the delivery queue. + */ + private int getMessageCount() + { + return _messages.size(); + } + + + public synchronized List<AMQMessage> getMessages() + { + return new ArrayList<AMQMessage>(_messages); + } + + public void populatePreDeliveryQueue(Subscription subscription) + { + if (_log.isTraceEnabled()) + { + _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")"); + } + + Iterator<AMQMessage> currentQueue = _messages.iterator(); + + while (currentQueue.hasNext()) + { + AMQMessage message = currentQueue.next(); + if (subscription.hasInterest(message)) + { + subscription.enqueueForPreDelivery(message); + } + } + } + + public synchronized void removeAMessageFromTop() throws AMQException + { + AMQMessage msg = poll(); + if (msg != null) + { + msg.dequeue(_queue); + } + } + + public synchronized void clearAllMessages() throws AMQException + { + AMQMessage msg = poll(); + while (msg != null) + { + msg.dequeue(_queue); + msg = poll(); + } + } + + + private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) + { + AMQMessage message = messages.peek(); + + while (message != null && (sub.isBrowser() || message.taken())) + { + //remove the already taken message + messages.poll(); + // try the next message + message = messages.peek(); + } + return message; + } + + public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue) + { + AMQMessage message = null; + try + { + message = getNextMessage(messageQueue, sub); + + // message will be null if we have no messages in the messageQueue. + if (message == null) + { + return; + } + _log.info("Async Delivery Message:" + message + " to :" + sub); + + sub.send(message, _queue); + + //remove sent message from our queue. + messageQueue.poll(); + } + catch (AMQException e) + { + message.release(); + _log.error("Unable to deliver message as dequeue failed: " + e, e); + } + } + + /** + * Only one thread should ever execute this method concurrently, but + * it can do so while other threads invoke deliver(). + */ + private void processQueue() + { + // Continue to process delivery while we haveSubscribers and messages + boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); + + while (hasSubscribers && hasQueuedMessages()) + { + hasSubscribers = false; + + for (Subscription sub : _subscriptions.getSubscriptions()) + { + if (!sub.isSuspended()) + { + sendNextMessage(sub); + + hasSubscribers = true; + } + } + } + } + + private void sendNextMessage(Subscription sub) + { + if (sub.hasFilters()) + { + sendNextMessage(sub, sub.getPreDeliveryQueue()); + if (sub.isAutoClose()) + { + if (sub.getPreDeliveryQueue().isEmpty()) + { + sub.close(); + } + } + } + else + { + sendNextMessage(sub, _messages); + } + } + + private AMQMessage poll() + { + return _messages.poll(); + } + + public void deliver(String name, AMQMessage msg) throws AMQException + { + _log.info(id() + "deliver :" + System.identityHashCode(msg)); + + //Check if we have someone to deliver the message to. + _lock.lock(); + try + { + Subscription s = _subscriptions.nextSubscriber(msg); + + if (s == null) //no-one can take the message right now. + { + _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery"); + if (!msg.getPublishBody().immediate) + { + addMessageToQueue(msg); + + //release lock now message is on queue. + _lock.unlock(); + + //Pre Deliver to all subscriptions + _log.info(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); + for (Subscription sub : _subscriptions.getSubscriptions()) + { + + // stop if the message gets delivered whilst PreDelivering if we have a shared queue. + if (_queue.isShared() && msg.getDeliveredToConsumer()) + { + _log.info(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered."); + continue; + } + + // Only give the message to those that want them. + if (sub.hasInterest(msg)) + { + _log.info(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); + sub.enqueueForPreDelivery(msg); + } + } + } + } + else + { + //release lock now + _lock.unlock(); + + _log.info(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); + //Deliver the message + s.send(msg, _queue); + } + } + finally + { + //ensure lock is released + if (_lock.isLocked()) + { + _lock.unlock(); + } + } + } + + //fixme remove + private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")"; + + private String id() + { + return id; + } + + Runner asyncDelivery = new Runner(); + + private class Runner implements Runnable + { + public void run() + { + boolean running = true; + while (running) + { + processQueue(); + + //Check that messages have not been added since we did our last peek(); + // Synchronize with the thread that adds to the queue. + // If the queue is still empty then we can exit + + if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) + { + running = false; + _processing.set(false); + } + } + } + } + + public void processAsync(Executor executor) + { + _log.info("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + + " Active:" + _subscriptions.hasActiveSubscribers() + + " Processing:" + _processing.get()); + + if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) + { + //are we already running? if so, don't re-run + if (_processing.compareAndSet(false, true)) + { + executor.execute(asyncDelivery); + } + } + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index b8ba0118ab..6f31616114 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -73,4 +73,6 @@ interface DeliveryManager void clearAllMessages() throws AMQException; List<AMQMessage> getMessages(); + + void populatePreDeliveryQueue(Subscription subscription); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index 9136264087..d04b6d3f60 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import java.util.Queue; + public interface Subscription { void send(AMQMessage msg, AMQQueue queue) throws AMQException; @@ -29,4 +31,18 @@ public interface Subscription boolean isSuspended(); void queueDeleted(AMQQueue queue) throws AMQException; + + boolean hasFilters(); + + boolean hasInterest(AMQMessage msg); + + Queue<AMQMessage> getPreDeliveryQueue(); + + void enqueueForPreDelivery(AMQMessage msg); + + boolean isAutoClose(); + + void close(); + + boolean isBrowser(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java index 0fd44e4fbc..2bb77dc649 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.FieldTable; /** * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This @@ -32,9 +33,10 @@ import org.apache.qpid.AMQException; */ public interface SubscriptionFactory { - Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) - throws AMQException; + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, + FieldTable filters, boolean noLocal) throws AMQException; - Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag) - throws AMQException; + + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) + throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index f94fd6259f..67380f024c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -22,8 +22,21 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.mina.common.ByteBuffer; + +import java.util.Queue; /** * Encapsulation of a supscription to a queue. @@ -44,23 +57,30 @@ public class SubscriptionImpl implements Subscription private final Object sessionKey; + private Queue<AMQMessage> _messages; + + private final boolean _noLocal; + /** * True if messages need to be acknowledged */ private final boolean _acks; + private FilterManager _filters; + private final boolean _isBrowser; + private final Boolean _autoClose; + private boolean _closed = false; public static class Factory implements SubscriptionFactory { - public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) - throws AMQException + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal); } public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false); } } @@ -68,6 +88,13 @@ public class SubscriptionImpl implements Subscription String consumerTag, boolean acks) throws AMQException { + this(channelId, protocolSession, consumerTag, acks, null, false); + } + + public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, + String consumerTag, boolean acks, FieldTable filters, boolean noLocal) + throws AMQException + { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) { @@ -79,8 +106,61 @@ public class SubscriptionImpl implements Subscription this.consumerTag = consumerTag; sessionKey = protocolSession.getKey(); _acks = acks; + _noLocal = noLocal; + + _filters = FilterManagerFactory.createManager(filters); + + + if (_filters != null) + { + Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue()); + if (isBrowser != null) + { + _isBrowser = (Boolean) isBrowser; + } + else + { + _isBrowser = false; + } + } + else + { + _isBrowser = false; + } + + + if (_filters != null) + { + Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue()); + if (autoClose != null) + { + _autoClose = (Boolean) autoClose; + } + else + { + _autoClose = false; + } + } + else + { + _autoClose = false; + } + + + if (_filters != null) + { + _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + + + } + else + { + // Reference the DeliveryManager + _messages = null; + } } + public SubscriptionImpl(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException @@ -125,6 +205,44 @@ public class SubscriptionImpl implements Subscription { if (msg != null) { + if (_isBrowser) + { + sendToBrowser(msg, queue); + } + else + { + sendToConsumer(msg, queue); + } + } + else + { + _logger.error("Attempt to send Null message", new NullPointerException()); + } + } + + private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException + { + // We don't decrement the reference here as we don't want to consume the message + // but we do want to send it to the client. + + synchronized(channel) + { + long deliveryTag = channel.getNextDeliveryTag(); + + // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client + // received the message. If it is lost in transit that is not important. + if (_acks) + { + channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); + } + msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); + } + } + + private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws AMQException + { + try + { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -150,9 +268,9 @@ public class SubscriptionImpl implements Subscription msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); } } - else + finally { - _logger.error("Attempt to send Null message", new NullPointerException()); + msg.setDeliveredToConsumer(); } } @@ -170,4 +288,110 @@ public class SubscriptionImpl implements Subscription { channel.queueDeleted(queue); } + + public boolean hasFilters() + { + return _filters != null; + } + + public boolean hasInterest(AMQMessage msg) + { + if (_noLocal) + { + // We don't want local messages so check to see if message is one we sent + if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals( + msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString()))) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + + System.identityHashCode(msg) + ")"); + } + return false; + } + else // if not then filter the message. + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) + + ") but not ours so filtering"); + } + return checkFilters(msg); + } + } + else + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); + } + return checkFilters(msg); + } + } + + private boolean checkFilters(AMQMessage msg) + { + if (_filters != null) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has filters."); + } + return _filters.allAllow(msg); + } + else + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no filters"); + } + + return true; + } + } + + public Queue<AMQMessage> getPreDeliveryQueue() + { + return _messages; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + if (_messages != null) + { + _messages.offer(msg); + } + } + + public boolean isAutoClose() + { + return _autoClose; + } + + public void close() + { + if (!_closed) + { + _logger.info("Closing autoclose subscription:" + this); + protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag)); + _closed = true; + } + } + + public boolean isBrowser() + { + return _isBrowser; + } + + + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) + { + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, + deliveryTag, false, exchange, + routingKey); + ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? + deliverFrame.writePayload(buf); + buf.flip(); + return buf; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java index 353b461c8d..4df88baebc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.server.queue; +import java.util.List; + /** * Abstraction of actor that will determine the subscriber to whom * a message will be sent. */ public interface SubscriptionManager { + public List<Subscription> getSubscriptions(); public boolean hasActiveSubscribers(); public Subscription nextSubscriber(AMQMessage msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 0de036de36..8272202571 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -60,6 +60,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Remove the subscription, returning it if it was found + * * @param subscription * @return null if no match was found */ @@ -92,7 +93,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Return the next unsuspended subscription or null if not found. - * + * <p/> * Performance note: * This method can scan all items twice when looking for a subscription that is not * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this @@ -107,31 +108,51 @@ class SubscriptionSet implements WeightedSubscriptionManager return null; } - try { - final Subscription result = nextSubscriber(); - if (result == null) { + try + { + final Subscription result = nextSubscriberImpl(msg); + if (result == null) + { _currentSubscriber = 0; - return nextSubscriber(); - } else { + return nextSubscriberImpl(msg); + } + else + { return result; } - } catch (IndexOutOfBoundsException e) { + } + catch (IndexOutOfBoundsException e) + { _currentSubscriber = 0; - return nextSubscriber(); + return nextSubscriber(msg); } } - private Subscription nextSubscriber() + private Subscription nextSubscriberImpl(AMQMessage msg) { final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); - while (iterator.hasNext()) { + while (iterator.hasNext()) + { Subscription subscription = iterator.next(); ++_currentSubscriber; subscriberScanned(); - if (!subscription.isSuspended()) { - return subscription; + + if (!subscription.isSuspended()) + { + if (subscription.hasInterest(msg)) + { + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } + } } } + return null; } @@ -147,11 +168,19 @@ class SubscriptionSet implements WeightedSubscriptionManager return _subscriptions.isEmpty(); } + public List<Subscription> getSubscriptions() + { + return _subscriptions; + } + public boolean hasActiveSubscribers() { for (Subscription s : _subscriptions) { - if (!s.isSuspended()) return true; + if (!s.isSuspended()) + { + return true; + } } return false; } @@ -161,7 +190,10 @@ class SubscriptionSet implements WeightedSubscriptionManager int count = 0; for (Subscription s : _subscriptions) { - if (!s.isSuspended()) count++; + if (!s.isSuspended()) + { + count++; + } } return count; } @@ -169,6 +201,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Notification that a queue has been deleted. This is called so that the subscription can inform the * channel, which in turn can update its list of unacknowledged messages. + * * @param queue */ public void queueDeleted(AMQQueue queue) throws AMQException @@ -179,7 +212,8 @@ class SubscriptionSet implements WeightedSubscriptionManager } } - int size() { + int size() + { return _subscriptions.size(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index c8715f263f..7332ffbbee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ class SynchronizedDeliveryManager implements DeliveryManager { - private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class); + private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class); /** * Holds any queued messages @@ -124,6 +124,11 @@ class SynchronizedDeliveryManager implements DeliveryManager return new ArrayList<AMQMessage>(_messages); } + public void populatePreDeliveryQueue(Subscription subscription) + { + //no-op . This DM has no PreDeliveryQueues + } + public synchronized void removeAMessageFromTop() throws AMQException { AMQMessage msg = poll(); @@ -245,7 +250,6 @@ class SynchronizedDeliveryManager implements DeliveryManager else { s.send(msg, _queue); - msg.setDeliveredToConsumer(); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 8c3692a98d..7321854034 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.store.MessageStore; import java.util.LinkedList; import java.util.List; +import java.util.Set; /** * @author Apache Software Foundation @@ -49,6 +50,8 @@ public class NonTransactionalContext implements TransactionalContext */ private final List<RequiredDeliveryException> _returnMessages; + private Set<Long> _browsedAcks; + private final MessageStore _messageStore; /** @@ -57,11 +60,12 @@ public class NonTransactionalContext implements TransactionalContext private boolean _inTran; public NonTransactionalContext(MessageStore messageStore, AMQChannel channel, - List<RequiredDeliveryException> returnMessages) + List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks) { _channel = channel; _returnMessages = returnMessages; _messageStore = messageStore; + _browsedAcks = browsedAcks; } public void beginTranIfNecessary() throws AMQException @@ -111,12 +115,19 @@ public class NonTransactionalContext implements TransactionalContext //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, // tells the server to acknowledge all outstanding mesages. _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + - unacknowledgedMessageMap.size()); + unacknowledgedMessageMap.size()); unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { public boolean callback(UnacknowledgedMessage message) throws AMQException { - message.discard(); + if (!_browsedAcks.contains(deliveryTag)) + { + message.discard(); + } + else + { + _browsedAcks.remove(deliveryTag); + } return false; } @@ -137,7 +148,14 @@ public class NonTransactionalContext implements TransactionalContext unacknowledgedMessageMap.drainTo(acked, deliveryTag); for (UnacknowledgedMessage msg : acked) { - msg.discard(); + if (!_browsedAcks.contains(deliveryTag)) + { + msg.discard(); + } + else + { + _browsedAcks.remove(deliveryTag); + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java index 4767844abe..b58d551226 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java @@ -20,10 +20,15 @@ */ package org.apache.qpid.server.util; +import org.apache.log4j.Logger; + import java.util.Iterator; public class CircularBuffer implements Iterable { + + private static final Logger _logger = Logger.getLogger(CircularBuffer.class); + private final Object[] _log; private int _size; private int _index; @@ -102,7 +107,7 @@ public class CircularBuffer implements Iterable { for(Object o : this) { - System.out.println(o); + _logger.info(o); } } @@ -120,7 +125,7 @@ public class CircularBuffer implements Iterable for(String s : items) { buffer.add(s); - System.out.println(buffer); + _logger.info(buffer); } } } diff --git a/java/client/pom.xml b/java/client/pom.xml index f80db8b774..73090078eb 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -35,7 +35,6 @@ <properties> <topDirectoryLocation>..</topDirectoryLocation> - <amqj.logging.level>warn</amqj.logging.level> </properties> <dependencies> @@ -98,6 +97,11 @@ <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <systemProperties> diff --git a/java/client/src/log4j.properties b/java/client/src/log4j.properties index d3135ff574..5614cb76f3 100644 --- a/java/client/src/log4j.properties +++ b/java/client/src/log4j.properties @@ -1,10 +1,10 @@ -log4j.rootLogger=${root.logging.level} +log4j.rootLogger=${amqj.logging.level} log4j.logger.org.apache.qpid=${amqj.logging.level}, console log4j.additivity.org.apache.qpid=false log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=info +log4j.appender.console.Threshold=debug log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n diff --git a/java/client/src/main/java/log4j.properties b/java/client/src/main/java/log4j.properties index db8f43cb3b..a4b4d3ed6c 100644 --- a/java/client/src/main/java/log4j.properties +++ b/java/client/src/main/java/log4j.properties @@ -16,10 +16,10 @@ # specific language governing permissions and limitations # under the License. # -log4j.rootLogger=WARN +log4j.rootLogger=${amqj.logging.level} -log4j.logger.org.apache.qpid=WARN, console +log4j.logger.org.apache.qpid=${amqj.logging.level}, console log4j.additivity.org.apache.qpid=false log4j.appender.console=org.apache.log4j.ConsoleAppender diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java new file mode 100644 index 0000000000..5c753946a6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.log4j.Logger; + +import java.util.Enumeration; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.*; +import javax.jms.IllegalStateException; + +public class AMQQueueBrowser implements QueueBrowser +{ + private static final Logger _logger = Logger.getLogger(AMQQueueBrowser.class); + + + private AtomicBoolean _isClosed = new AtomicBoolean(); + private final AMQSession _session; + private final AMQQueue _queue; + private final ArrayList<BasicMessageConsumer> _consumers = new ArrayList<BasicMessageConsumer>(); + private final String _messageSelector; + + + AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelector) throws JMSException + { + _session = session; + _queue = queue; + _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector; + BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + consumer.close(); + } + + public Queue getQueue() throws JMSException + { + checkState(); + return _queue; + } + + private void checkState() throws JMSException + { + if (_isClosed.get()) + { + throw new IllegalStateException("Queue Browser"); + } + if (_session.isClosed()) + { + throw new IllegalStateException("Session is closed"); + } + + } + + public String getMessageSelector() throws JMSException + { + + checkState(); + return _messageSelector; + } + + public Enumeration getEnumeration() throws JMSException + { + checkState(); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + _consumers.add(consumer); + + return new Enumeration() + { + + + Message _nextMessage = consumer.receive(); + + + public boolean hasMoreElements() + { + _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + return (_nextMessage != null); + } + + public Object nextElement() + { + Message msg = _nextMessage; + try + { + _logger.info("QB:nextElement about to receive"); + + _nextMessage = consumer.receive(); + _logger.info("QB:nextElement received:" + _nextMessage); + } + catch (JMSException e) + { + _logger.warn("Exception caught while queue browsing", e); + _nextMessage = null; + } + + return msg; + } + }; + } + + public void close() throws JMSException + { + for (BasicMessageConsumer consumer : _consumers) + { + consumer.close(); + } + _consumers.clear(); + } + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index fae5e7ac08..2136d565f1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -23,13 +23,15 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -69,15 +71,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; /** - * Used to reference durable subscribers so they requests for unsubscribe can be handled - * correctly. Note this only keeps a record of subscriptions which have been created - * in the current instance. It does not remember subscriptions between executions of the - * client + * Used to reference durable subscribers so they requests for unsubscribe can be handled + * correctly. Note this only keeps a record of subscriptions which have been created + * in the current instance. It does not remember subscriptions between executions of the + * client */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + new ConcurrentHashMap<BasicMessageConsumer, String>(); /** * Used in the consume method. We generate the consume tag on the client so that we can use the nowait @@ -143,6 +145,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _inRecovery; + /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ @@ -176,7 +179,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (message.deliverBody != null) { - final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag); if (consumer == null) { @@ -210,17 +213,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); } + else if (errorCode == AMQConstant.NO_ROUTE.getCode()) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + } else { - if (errorCode == AMQConstant.NO_ROUTE.getCode()) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } + _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); } + } catch (Exception e) { @@ -318,7 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -334,7 +335,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -350,7 +351,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public javax.jms.Message createMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -366,7 +367,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -382,7 +383,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -400,7 +401,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public StreamMessage createStreamMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); @@ -417,7 +418,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); @@ -434,7 +435,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage(String text) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -504,7 +505,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -569,7 +570,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -721,11 +722,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void acknowledge() throws JMSException { - if(isClosed()) + if (isClosed()) { throw new IllegalStateException("Session is already closed"); } - for(BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer consumer : _consumers.values()) { consumer.acknowledge(); } @@ -734,7 +735,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public MessageListener getMessageListener() throws JMSException { checkNotClosed(); @@ -843,7 +843,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false, null, - null); + null, + false, + false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException @@ -855,7 +857,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false, messageSelector, - null); + null, + false, + false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) @@ -868,7 +872,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi noLocal, false, messageSelector, - null); + null, + false, + false); + } + + public MessageConsumer createBrowserConsumer(Destination destination, + String messageSelector, + boolean noLocal) + throws JMSException + { + checkValidDestination(destination); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + noLocal, + false, + messageSelector, + null, + true, + true); } public MessageConsumer createConsumer(Destination destination, @@ -878,7 +901,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); } @@ -890,7 +913,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); } public MessageConsumer createConsumer(Destination destination, @@ -902,7 +925,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, - selector, rawSelector); + selector, rawSelector, false, false); } public MessageConsumer createConsumer(Destination destination, @@ -915,7 +938,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, - selector, rawSelector); + selector, rawSelector, false, false); } protected MessageConsumer createConsumerImpl(final Destination destination, @@ -924,7 +947,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final boolean noLocal, final boolean exclusive, final String selector, - final FieldTable rawSelector) throws JMSException + final FieldTable rawSelector, + final boolean noConsume, + final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); @@ -948,12 +973,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, - _acknowledgeMode); + _acknowledgeMode, noConsume, autoClose); try { registerConsumer(consumer, false); } + catch (AMQInvalidSelectorException ise) + { + JMSException ex = new InvalidSelectorException(ise.getMessage()); + ex.setLinkedException(ise); + throw ex; + } catch (AMQException e) { JMSException ex = new JMSException("Error registering consumer: " + e); @@ -963,7 +994,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized(destination) { - _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger()); + _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); _destinationConsumerCount.get(destination).incrementAndGet(); } @@ -975,16 +1006,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void checkTemporaryDestination(Destination destination) throws JMSException { - if((destination instanceof TemporaryDestination)) + if ((destination instanceof TemporaryDestination)) { _logger.debug("destination is temporary"); final TemporaryDestination tempDest = (TemporaryDestination) destination; - if(tempDest.getSession() != this) + if (tempDest.getSession() != this) { _logger.debug("destination is on different session"); throw new JMSException("Cannot consume from a temporary destination created onanother session"); } - if(tempDest.isDeleted()) + if (tempDest.isDeleted()) { _logger.debug("destination is deleted"); throw new JMSException("Cannot consume from a deleted destination"); @@ -1065,12 +1096,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @return the consumer tag generated by the broker */ private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler, - boolean nowait) throws AMQException + boolean nowait, String messageSelector) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag String tag = Integer.toString(_nextTag++); + FieldTable arguments = FieldTableFactory.newFieldTable(); + if (messageSelector != null && !messageSelector.equals("")) + { + arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); + } + if(consumer.isAutoClose()) + { + arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); + } + if(consumer.isNoConsume()) + { + arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); + } + consumer.setConsumerTag(tag); // we must register the consumer in the map before we actually start listening _consumers.put(tag, consumer); @@ -1080,7 +1125,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, queueName, tag, consumer.isNoLocal(), consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, - consumer.isExclusive(), nowait, null); + consumer.isExclusive(), nowait, arguments); if (nowait) { protocolHandler.writeFrame(jmsConsume); @@ -1220,7 +1265,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); TopicSubscriberAdaptor subscriber = _subscriptions.get(name); if (subscriber != null) { @@ -1247,8 +1292,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - _subscriptions.put(name,subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name); + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); return subscriber; } @@ -1278,8 +1323,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name,subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name); + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); return subscriber; } @@ -1291,16 +1336,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueBrowser createBrowser(Queue queue) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); - throw new UnsupportedOperationException("Queue browsing not supported"); + return createBrowser(queue, null); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); checkValidQueue(queue); - throw new UnsupportedOperationException("Queue browsing not supported"); + return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1476,7 +1519,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - consumeFromQueue(consumer, queueName, protocolHandler, nowait); + try + { + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); + } + catch (JMSException e) //thrown by getMessageSelector + { + throw new AMQException(e.getMessage(), e); + } } /** @@ -1489,7 +1539,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _consumers.remove(consumer.getConsumerTag()); String subscriptionName = _reverseSubscriptionMap.remove(consumer); - if(subscriptionName != null) + if (subscriptionName != null) { _subscriptions.remove(subscriptionName); } @@ -1497,7 +1547,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi Destination dest = consumer.getDestination(); synchronized(dest) { - if(_destinationConsumerCount.get(dest).decrementAndGet() == 0) + if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) { _destinationConsumerCount.remove(dest); } @@ -1567,6 +1617,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(channelFlowFrame); } + public void confirmConsumerCancelled(String consumerTag) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + if((consumer != null) && (consumer.isAutoClose())) + { + consumer.closeWhenNoMessages(true); + } + } + + /* * I could have combined the last 3 methods, but this way it improves readability */ @@ -1576,7 +1636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { throw new javax.jms.InvalidDestinationException("Invalid Topic"); } - if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this) + if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this) { throw new JMSException("Cannot create a subscription on a temporary topic created in another session"); } @@ -1597,4 +1657,5 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw new javax.jms.InvalidDestinationException("Invalid Queue"); } } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index f0d3cf5abc..cefaca8d52 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -145,10 +145,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private Thread _receivingThread; + /** + * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive + * on the queue. This is used for queue browsing. + */ + private boolean _autoClose; + private boolean _closeWhenNoMessages; + + private boolean _noConsume; + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, - boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, - int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode) + boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, + int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -164,6 +173,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _exclusive = exclusive; _acknowledgeMode = acknowledgeMode; _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); + _autoClose = autoClose; + _noConsume = noConsume; } public AMQDestination getDestination() @@ -321,6 +332,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + if(closeOnAutoClose()) + { + return null; + } Object o = null; if (l > 0) { @@ -350,6 +365,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + private boolean closeOnAutoClose() throws JMSException + { + if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) + { + close(false); + return true; + } + else + { + return false; + } + } + public Message receiveNoWait() throws JMSException { checkPreConditions(); @@ -358,6 +386,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + if(closeOnAutoClose()) + { + return null; + } Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -402,22 +434,31 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + public void close() throws JMSException { + close(true); + } + + public void close(boolean sendClose) throws JMSException + { synchronized(_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) { - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); - - try - { - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); - } - catch (AMQException e) + if(sendClose) { - _logger.error("Error closing consumer: " + e, e); - throw new JMSException("Error closing consumer: " + e); + final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); + + try + { + _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + } + catch (AMQException e) + { + _logger.error("Error closing consumer: " + e, e); + throw new JMSException("Error closing consumer: " + e); + } } deregisterConsumer(); @@ -513,6 +554,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer msg.setJMSDestination(_destination); switch (_acknowledgeMode) { + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + break; case Session.DUPS_OK_ACKNOWLEDGE: if (++_outstanding >= _prefetchHigh) { @@ -539,7 +586,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } break; case Session.SESSION_TRANSACTED: - _lastDeliveryTag = msg.getDeliveryTag(); + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _lastDeliveryTag = msg.getDeliveryTag(); + } break; } } @@ -630,4 +684,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _unacknowledgedDeliveryTags.clear(); } + + public boolean isAutoClose() + { + return _autoClose; + } + + + public boolean isNoConsume() + { + return _noConsume; + } + + public void closeWhenNoMessages(boolean b) + { + _closeWhenNoMessages = b; + + if(_closeWhenNoMessages + && _synchronousQueue.isEmpty() + && _receiving.get() + && _messageListener != null) + { + _receivingThread.interrupt(); + } + + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java index 6ab7808110..9ee802ff10 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java +++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java @@ -20,16 +20,16 @@ */ package org.apache.qpid.client; -import java.util.Enumeration; +import org.apache.qpid.common.QpidProperties; import javax.jms.ConnectionMetaData; import javax.jms.JMSException; +import java.util.Enumeration; public class QpidConnectionMetaData implements ConnectionMetaData { - QpidConnectionMetaData(AMQConnection conn) { } @@ -46,7 +46,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData public String getJMSProviderName() throws JMSException { - return "Apache Qpid"; + return "Apache " + QpidProperties.getProductName(); } public String getJMSVersion() throws JMSException @@ -71,8 +71,8 @@ public class QpidConnectionMetaData implements ConnectionMetaData public String getProviderVersion() throws JMSException { - return "QPID (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ " - + getProtocolVersion() + "] )"; + return QpidProperties.getProductName() + " (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ " + + getProtocolVersion() + "] )"; } private String getProtocolVersion() @@ -89,8 +89,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData public String getClientVersion() { - // TODO - get client build version from properties file or similar - return "<unknown>"; + return QpidProperties.getBuildVersion(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java new file mode 100644 index 0000000000..d855e97204 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -0,0 +1,35 @@ +package org.apache.qpid.client.handler; + +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.log4j.Logger; + +/** + * @author Apache Software Foundation + */ +public class BasicCancelOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); + private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + + public static BasicCancelOkMethodHandler getInstance() + { + return _instance; + } + + private BasicCancelOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("New BasicCancelOk method received"); + BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); + evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 2bd93f1508..fd2968cdfd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.protocol.AMQConstant; @@ -46,7 +47,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { - _logger.debug("ChannelClose method received"); + _logger.debug("ChannelClose method received"); ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); int errorCode = method.replyCode; @@ -65,17 +66,21 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener { throw new AMQNoConsumersException("Error: " + reason, null); } + else if (errorCode == AMQConstant.NO_ROUTE.getCode()) + { + throw new AMQNoRouteException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode()) + { + _logger.info("Broker responded with Invalid Selector."); + + throw new AMQInvalidSelectorException(reason); + } else { - if (errorCode == AMQConstant.NO_ROUTE.getCode()) - { - throw new AMQNoRouteException("Error: " + reason, null); - } - else - { - throw new AMQChannelClosedException(errorCode, "Error: " + reason); - } + throw new AMQChannelClosedException(errorCode, "Error: " + reason); } + } evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 9333df3fe4..1e0366ec4d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -22,6 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.security.AMQCallbackHandler; @@ -119,10 +121,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.put("instance", ps.getClientID()); - clientProperties.put("product", "Qpid"); - clientProperties.put("version", "1.0"); - clientProperties.put("platform", getFullSystemInfo()); + + clientProperties.put(ClientProperties.instance.toString(), ps.getClientID()); + _log.info("Product name: " + QpidProperties.getProductName()); + clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName()); + clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion()); + clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo()); ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism, saslResponse, selectedLocale)); } @@ -130,6 +134,10 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener { throw new AMQException(_log, "Unable to decode data: " + e, e); } + catch (Throwable t) + { + _log.error("Error: " + t, t); + } } private String getFullSystemInfo() diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index fea7a29594..40d8b28411 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -26,7 +26,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.*; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.BasicMessageConsumer; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; @@ -46,7 +47,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach protected ByteBuffer _data; private boolean _readableProperties = false; - private boolean _readableMessage = false; + protected boolean _readableMessage = false; + protected boolean _changedData; private Destination _destination; private BasicMessageConsumer _consumer; @@ -60,6 +62,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } _readableProperties = false; _readableMessage = (data != null); + _changedData = (data == null); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException @@ -521,16 +524,16 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach return !_readableMessage; } - public void reset() + public void reset() { - if (_readableMessage) + if (!_changedData) { _data.rewind(); } else { _data.flip(); - _readableMessage = true; + _changedData = false; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index f5c9f7111a..d769300c69 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -59,6 +59,12 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag super(messageNbr, contentHeader, data); } + public void reset() + { + super.reset(); + _readableMessage = true; + } + public String getMimeType() { return MIME_TYPE; @@ -226,48 +232,56 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag public void writeBoolean(boolean b) throws JMSException { checkWritable(); + _changedData = true; _data.put(b ? (byte) 1 : (byte) 0); } public void writeByte(byte b) throws JMSException { checkWritable(); + _changedData = true; _data.put(b); } public void writeShort(short i) throws JMSException { checkWritable(); + _changedData = true; _data.putShort(i); } public void writeChar(char c) throws JMSException { checkWritable(); + _changedData = true; _data.putChar(c); } public void writeInt(int i) throws JMSException { checkWritable(); + _changedData = true; _data.putInt(i); } public void writeLong(long l) throws JMSException { checkWritable(); + _changedData = true; _data.putLong(l); } public void writeFloat(float v) throws JMSException { checkWritable(); + _changedData = true; _data.putFloat(v); } public void writeDouble(double v) throws JMSException { checkWritable(); + _changedData = true; _data.putDouble(v); } @@ -281,7 +295,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag _data.putShort((short)encodedString.limit()); _data.put(encodedString); - + _changedData = true; //_data.putString(string, Charset.forName("UTF-8").newEncoder()); // we must add the null terminator manually //_data.put((byte)0); @@ -298,12 +312,14 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag { checkWritable(); _data.put(bytes); + _changedData = true; } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { checkWritable(); _data.put(bytes, offset, length); + _changedData = true; } public void writeObject(Object object) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 4fb070d2ff..35c5377f14 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -112,7 +112,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } } - + public Serializable getObject() throws JMSException { ObjectInputStream in = null; @@ -123,18 +123,18 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag try { - _data.rewind(); + _data.rewind(); in = new ObjectInputStream(_data.asInputStream()); return (Serializable) in.readObject(); } catch (IOException e) - { - e.printStackTrace(); - throw new MessageFormatException("Could not deserialize message: " + e); + { + e.printStackTrace(); + throw new MessageFormatException("Could not deserialize message: " + e); } catch (ClassNotFoundException e) { - e.printStackTrace(); + e.printStackTrace(); throw new MessageFormatException("Could not deserialize message: " + e); } finally diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index c2dfdc1b65..6709ff802d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -86,6 +86,12 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess super(messageNbr, contentHeader, data); } + public void reset() + { + super.reset(); + _readableMessage = true; + } + public String getMimeType() { return MIME_TYPE; @@ -103,6 +109,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess { checkWritable(); _data.put(type); + _changedData = true; } public boolean readBoolean() throws JMSException @@ -693,7 +700,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess { _data.putString(string, Charset.forName("UTF-8").newEncoder()); // we must write the null terminator ourselves - _data.put((byte)0); + _data.put((byte) 0); } catch (CharacterCodingException e) { @@ -706,7 +713,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeBytes(byte[] bytes) throws JMSException { - writeBytes(bytes, 0, bytes == null?0:bytes.length); + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 76f8a1c32f..d8394b0489 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -117,6 +117,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding())); } + _changedData=true; } _decodedValue = text; } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index a4ed89719b..6a40fd3133 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -406,4 +406,12 @@ public class AMQProtocolSession implements ProtocolVersionList HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } + + public void confirmConsumerCancelled(int channelId, String consumerTag) + { + final Integer chId = channelId; + final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); + + session.confirmConsumerCancelled(consumerTag); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 492571b6af..21ae3fc71f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -110,7 +110,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } else { - throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught. + throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught. } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 887850c06e..50bd1667f9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -103,6 +103,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); + frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance()); frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 0de2850080..d6364f45b0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -59,7 +59,7 @@ public class SocketTransportConnection implements ITransportConnection // once more testing of the performance of the simple allocator has been done if (!Boolean.getBoolean("amqj.enablePooledAllocator")) { - _logger.warn("Using SimpleByteBufferAllocator"); + _logger.info("Using SimpleByteBufferAllocator"); ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 53e7fd066e..5497cafed4 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -40,11 +40,15 @@ import javax.naming.spi.InitialContextFactory; import java.util.Hashtable; import java.util.Iterator; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; public class PropertiesFileInitialContextFactory implements InitialContextFactory { - protected final Logger _logger = Logger.getLogger(getClass()); + protected final Logger _logger = Logger.getLogger(PropertiesFileInitialContextFactory.class); private String CONNECTION_FACTORY_PREFIX = "connectionfactory."; private String DESTINATION_PREFIX = "destination."; @@ -55,6 +59,41 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { Map data = new ConcurrentHashMap(); + try + { + + String file = null; + if (environment.contains(Context.PROVIDER_URL)) + { + file = (String) environment.get(Context.PROVIDER_URL); + } + else + { + file = System.getProperty(Context.PROVIDER_URL); + } + + if (file != null) + { + _logger.info("Loading Properties from:" + file); + //Load the properties specified + Properties p = new Properties(); + + p.load(new BufferedInputStream(new FileInputStream(file))); + + environment.putAll(p); + _logger.info("Loaded Context Properties:" + environment.toString()); + } + else + { + _logger.warn("No Provider URL specified."); + } + } + catch (IOException ioe) + { + _logger.warn("Unable to load property file specified in Provider_URL:" + + environment.get(Context.PROVIDER_URL)); + } + createConnectionFactories(data, environment); createDestinations(data, environment); diff --git a/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java index a1e15258c3..f7a1502347 100644 --- a/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java +++ b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java @@ -29,6 +29,7 @@ import javax.naming.Context; import javax.jms.*; import javax.jms.MessageConsumer; import javax.jms.Session; +import javax.jms.Message; import java.util.Hashtable; import java.io.File; import java.io.FilenameFilter; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index 7f76baa157..17679788bd 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -116,7 +116,9 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setIntProperty("Int", (int) Integer.MAX_VALUE); m.setJMSCorrelationID("Correlation"); - m.setJMSPriority(100); + //fixme the m.setJMSMessage has no effect + producer.setPriority(8); + m.setJMSPriority(3); // Queue Queue q; @@ -182,10 +184,8 @@ public class PropertyValueTest extends TestCase implements MessageListener (int) Integer.MAX_VALUE, m.getIntProperty("Int")); Assert.assertEquals("Check CorrelationID properties are correctly transported", "Correlation", m.getJMSCorrelationID()); - - _logger.warn("getJMSPriority not being verified."); -// Assert.assertEquals("Check Priority properties are correctly transported", -// 100, m.getJMSPriority()); + Assert.assertEquals("Check Priority properties are correctly transported", + 8, m.getJMSPriority()); // Queue Assert.assertEquals("Check ReplyTo properties are correctly transported", diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java new file mode 100644 index 0000000000..27a2ccb32e --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.basic; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageProducer; +import org.apache.qpid.client.transport.TransportConnection; + +import org.apache.log4j.Logger; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.DeliveryMode; + +import junit.framework.TestCase; + +public class SelectorTest extends TestCase implements MessageListener +{ + + private final static Logger _logger = org.apache.log4j.Logger.getLogger(SelectorTest.class); + + private AMQConnection _connection; + private AMQDestination _destination; + private AMQSession _session; + private int count; + public String _connectionString = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + private void init(AMQConnection connection) throws Exception + { + init(connection, new AMQQueue(randomize("SessionStartTest"), true)); + } + + private void init(AMQConnection connection, AMQDestination destination) throws Exception + { + _connection = connection; + _destination = destination; + connection.start(); + + + String selector = null; +// selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; +// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; + + _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + //_session.createConsumer(destination).setMessageListener(this); + _session.createConsumer(destination, selector).setMessageListener(this); + } + + public synchronized void test() throws JMSException, InterruptedException + { + try + { + Message msg = _session.createTextMessage("Message"); + msg.setJMSPriority(1); + msg.setIntProperty("Cost", 2); + msg.setJMSType("Special"); + + _logger.info("Sending Message:" + msg); + + ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT); + System.out.println("Message sent, waiting for response..."); + wait(1000); + + if (count > 0) + { + _logger.info("Got message"); + } + + if (count == 0) + { + fail("Did not get message!"); + //throw new RuntimeException("Did not get message!"); + } + } + finally + { + _session.close(); + _connection.close(); + } + } + + public synchronized void onMessage(Message message) + { + count++; + _logger.info("Got Message:" + message); + notify(); + } + + private static String randomize(String in) + { + return in + System.currentTimeMillis(); + } + + public static void main(String[] argv) throws Exception + { + SelectorTest test = new SelectorTest(); + test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0]; + test.setUp(); + test.test(); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(SelectorTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index 79bd4f6dde..5bce3f64a2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -121,7 +121,7 @@ public class ChannelCloseOkTest extends TestCase { if (_connection != null) { - System.out.println(">>>>>>>>>>>>>>.. closing"); + _log.info(">>>>>>>>>>>>>>.. closing"); _connection.close(); } } @@ -137,7 +137,7 @@ public class ChannelCloseOkTest extends TestCase { public void onException(JMSException jmsException) { - _log.error("onException - ", jmsException); + _log.warn("onException - "+jmsException.getMessage()); } }); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java index 0005b20fb1..3022c8a59d 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java @@ -20,6 +20,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.server.cluster.util.LogMessage; +import java.util.List; + class ClusteredSubscriptionManager extends SubscriptionSet { private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class); @@ -79,6 +81,11 @@ class ClusteredSubscriptionManager extends SubscriptionSet return ClusteredSubscriptionManager.this.getWeight(); } + public List<Subscription> getSubscriptions() + { + return ClusteredSubscriptionManager.super.getSubscriptions(); + } + public boolean hasActiveSubscribers() { return ClusteredSubscriptionManager.super.hasActiveSubscribers(); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java index 0bb6537930..eec2563480 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java @@ -18,12 +18,12 @@ package org.apache.qpid.server.queue; import java.util.List; +import java.util.LinkedList; import java.util.concurrent.CopyOnWriteArrayList; /** * Distributes messages among a list of subsscription managers, using their * weighting. - * */ class NestedSubscriptionManager implements SubscriptionManager { @@ -41,11 +41,24 @@ class NestedSubscriptionManager implements SubscriptionManager _subscribers.remove(s); } + + public List<Subscription> getSubscriptions() + { + List<Subscription> allSubs = new LinkedList<Subscription>(); + + for (WeightedSubscriptionManager subMans : _subscribers) + { + allSubs.addAll(subMans.getSubscriptions()); + } + + return allSubs; + } + public boolean hasActiveSubscribers() { - for(WeightedSubscriptionManager s : _subscribers) + for (WeightedSubscriptionManager s : _subscribers) { - if(s.hasActiveSubscribers()) + if (s.hasActiveSubscribers()) { return true; } @@ -56,9 +69,9 @@ class NestedSubscriptionManager implements SubscriptionManager public Subscription nextSubscriber(AMQMessage msg) { WeightedSubscriptionManager start = current(); - for(WeightedSubscriptionManager s = start; s != null; s = next(start)) + for (WeightedSubscriptionManager s = start; s != null; s = next(start)) { - if(hasMore(s)) + if (hasMore(s)) { return nextSubscriber(s); } @@ -91,7 +104,7 @@ class NestedSubscriptionManager implements SubscriptionManager private WeightedSubscriptionManager next() { _iterations = 0; - if(++_index >= _subscribers.size()) + if (++_index >= _subscribers.size()) { _index = 0; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index eabf374e81..06dfd29cab 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -22,6 +22,9 @@ import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.AMQException; +import java.util.Queue; +import java.util.List; + class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager { private final GroupManager _groupMgr; @@ -73,6 +76,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage return _count; } + public List<Subscription> getSubscriptions() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public boolean hasActiveSubscribers() { return getWeight() == 0; @@ -85,9 +93,49 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage public void queueDeleted(AMQQueue queue) { - if(queue instanceof ClusteredQueue) + if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer); } } + + public boolean hasFilters() + { + return false; + } + + public boolean hasInterest(AMQMessage msg) + { + return true; + } + + public Queue<AMQMessage> getPreDeliveryQueue() + { + return null; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl + } + + public boolean isAutoClose() + { + return false; + } + + public void close() + { + //no-op + } + + public boolean isBrowser() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void sendNextMessage(AMQQueue queue) + { + + } } diff --git a/java/common/src/main/java/log4j.properties b/java/common/src/main/java/log4j.properties new file mode 100644 index 0000000000..6d596d1d19 --- /dev/null +++ b/java/common/src/main/java/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +log4j.rootLogger=${root.logging.level} + + +log4j.logger.org.apache.qpid=${amqj.logging.level}, console +log4j.additivity.org.apache.qpid=false + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=all +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java new file mode 100644 index 0000000000..dcd039b789 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid; + +import org.apache.qpid.protocol.AMQConstant; + +public class AMQInvalidSelectorException extends AMQException +{ + public AMQInvalidSelectorException(String message) + { + super(AMQConstant.INVALID_SELECTOR.getCode(),message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java new file mode 100644 index 0000000000..56219755a3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.common; + +public enum AMQPFilterTypes +{ + JMS_SELECTOR("x-filter-jms-selector"), + NO_CONSUME("x-filter-no-consume"), + AUTO_CLOSE("x-filter-auto-close"); + + private final String _value; + + AMQPFilterTypes(String value) + { + _value = value; + } + + public String getValue() + { + return _value; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java new file mode 100644 index 0000000000..07371b5182 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.common; + +public enum ClientProperties +{ + instance, + product, + version, + platform +} diff --git a/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java new file mode 100644 index 0000000000..f4f764db1b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.common; + +import org.apache.log4j.Logger; + +import java.util.Properties; +import java.util.Map; +import java.io.IOException; +import java.io.InputStream; + +public class QpidProperties +{ + private static final Logger _logger = Logger.getLogger(QpidProperties.class); + + public static final String VERSION_RESOURCE = "qpidversion.properties"; + + public static final String PRODUCT_NAME_PROPERTY = "qpid.name"; + public static final String RELEASE_VERSION_PROPERTY = "qpid.version"; + public static final String BUILD_VERSION_PROPERTY = "qpid.svnversion"; + + private static final String DEFAULT = "unknown"; + + private static String productName = DEFAULT; + private static String releaseVersion = DEFAULT; + private static String buildVersion = DEFAULT; + + /** Loads the values from the version properties file. */ + static + { + Properties props = new Properties(); + + try + { + InputStream propertyStream = QpidProperties.class.getClassLoader().getResourceAsStream(VERSION_RESOURCE); + if (propertyStream == null) + { + _logger.warn("Unable to find resource " + VERSION_RESOURCE + " from classloader"); + } + else + { + props.load(propertyStream); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Dumping QpidProperties"); + for (Map.Entry<Object,Object> entry : props.entrySet()) + { + _logger.debug("Property: " + entry.getKey() + " Value: "+ entry.getValue()); + } + _logger.debug("End of property dump"); + } + + productName = readPropertyValue(props, PRODUCT_NAME_PROPERTY); + releaseVersion = readPropertyValue(props, RELEASE_VERSION_PROPERTY); + buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY); + } + } + catch (IOException e) + { + // Log a warning about this and leave the values initialized to unknown. + _logger.error("Could not load version.properties resource: " + e, e); + } + } + + public static String getProductName() + { + return productName; + } + + public static String getReleaseVersion() + { + return releaseVersion; + } + + public static String getBuildVersion() + { + return buildVersion; + } + + public static String getVersionString() + { + return getProductName() + " - " + getReleaseVersion() + " build: " + getBuildVersion(); + } + + private static String readPropertyValue(Properties props, String propertyName) + { + String retVal = (String) props.get(propertyName); + if (retVal == null) + { + retVal = DEFAULT; + } + return retVal; + } + + public static void main(String[] args) + { + System.out.println(getVersionString()); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java index 1292ff2f6e..4b8f56e4e8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java @@ -129,7 +129,7 @@ public class PropertyFieldTable implements FieldTable } catch (Exception e) { - _logger.warn("Unable to decode PropertyFieldTable format:" + textFormat, e); + _logger.warn("Unable to decode PropertyFieldTable format:" + textFormat); throw new IllegalArgumentException("Unable to decode PropertyFieldTable format:" + textFormat); } } @@ -483,7 +483,7 @@ public class PropertyFieldTable implements FieldTable { return _properties.containsKey(name) && (_properties.get(name) == null) && _propertyNamesTypeMap.get(name).equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX); - + } @@ -606,7 +606,8 @@ public class PropertyFieldTable implements FieldTable // AMQ start character if (!(Character.isLetter(propertyName.charAt(0)) || propertyName.charAt(0) == '$' - || propertyName.charAt(0) == '#')) + || propertyName.charAt(0) == '#' + || propertyName.charAt(0) == '_')) // Not official AMQP added for JMS. { throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid AMQP start character"); } @@ -1156,9 +1157,9 @@ public class PropertyFieldTable implements FieldTable if (type == null) { String msg = "Field '" + key + "' - unsupported field table type: " + type + "."; - //some extra trace information... - msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining; - throw new AMQFrameDecodingException(msg); + //some extra trace information... + msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining; + throw new AMQFrameDecodingException(msg); } Object value; @@ -1203,7 +1204,7 @@ public class PropertyFieldTable implements FieldTable value = EncodingUtils.readBytes(buffer); break; default: - String msg = "Internal error, the following type identifier is not handled: " + type; + String msg = "Internal error, the following type identifier is not handled: " + type; throw new AMQFrameDecodingException(msg); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index fc83c0726d..a0d243ca30 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -55,7 +55,7 @@ public final class AMQConstant { return _name; } - + public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true); public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true); @@ -74,6 +74,8 @@ public final class AMQConstant public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true); + public static final AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true); + public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true); public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true); diff --git a/java/distribution/pom.xml b/java/distribution/pom.xml new file mode 100644 index 0000000000..ca91c222ee --- /dev/null +++ b/java/distribution/pom.xml @@ -0,0 +1,142 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-distribution</artifactId> + <packaging>jar</packaging> + <name>Qpid Distribution</name> + <version>1.0-incubating-M2-SNAPSHOT</version> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + <java.source.version>1.5</java.source.version> + <qpid.version>${pom.version}</qpid.version> + <qpid.targetDir>${project.build.directory}</qpid.targetDir> + </properties> + + <repositories> + <repository> + <id>repo1.maven.org</id> + <name>Maven eclipse Repository</name> + <url>http://repo1.maven.org/eclipse</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>${pom.groupId}</groupId> + <artifactId>qpid-common</artifactId> + <version>${pom.version}</version> + </dependency> + <dependency> + <groupId>${pom.groupId}</groupId> + <artifactId>qpid-broker</artifactId> + <version>${pom.version}</version> + </dependency> + <dependency> + <groupId>${pom.groupId}</groupId> + <artifactId>qpid-client</artifactId> + <version>${pom.version}</version> + </dependency> + <dependency> + <groupId>${pom.groupId}.management</groupId> + <artifactId>org.apache.qpid.management.ui</artifactId> + <version>${pom.version}</version> + </dependency> + </dependencies> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>${java.source.version}</source> + <target>${java.source.version}</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>${assembly.version}</version> + <configuration> + <descriptors> + <descriptor>src/main/assembly/bin.xml</descriptor> + </descriptors> + <finalName>qpid-${pom.version}</finalName> + <outputDirectory>${qpid.targetDir}</outputDirectory> + <tarLongFileMode>gnu</tarLongFileMode> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <finalName>qpid-incubating</finalName> + <archive> + <manifest> + <addClasspath>true</addClasspath> + </manifest> + </archive> + </configuration> + </plugin> + </plugins> + </pluginManagement> + <resources> + <resource> + <directory>src/main/java</directory> + <includes> + <include>**/*</include> + </includes> + </resource> + </resources> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>distribution-package</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <descriptors> + <descriptor>src/main/assembly/bin.xml</descriptor> + <descriptor>src/main/assembly/client-bin.xml</descriptor> + <descriptor>src/main/assembly/src.xml</descriptor> + <descriptor>src/main/assembly/management-eclipse-plugin.xml</descriptor> + <descriptor>src/main/assembly/management-eclipse-plugin-unix.xml</descriptor> + </descriptors> + <finalName>qpid-${pom.version}</finalName> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/java/distribution/src/main/assembly/bin.xml b/java/distribution/src/main/assembly/bin.xml new file mode 100644 index 0000000000..9b0a56a744 --- /dev/null +++ b/java/distribution/src/main/assembly/bin.xml @@ -0,0 +1,172 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<assembly> + <!-- id typically identifies the "type" (src vs bin etc) of the assembly --> + <id>java-bin</id> + <includeBaseDirectory>false</includeBaseDirectory> + <formats> + <format>tar.gz</format> + <format>zip</format> + </formats> + + <fileSets> + <fileSet> + <directory>src/main/release</directory> + <outputDirectory>qpid-${qpid.version}</outputDirectory> + <includes> + <include>DISCLAIMER</include> + </includes> + </fileSet> + <fileSet> + <directory>..</directory> + <outputDirectory>qpid-${qpid.version}</outputDirectory> + <includes> + <include>*.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/release/etc</directory> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <includes> + <include>logging.properties</include> + <include>log4j.properties</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/release/docs</directory> + <outputDirectory>qpid-${qpid.version}/docs</outputDirectory> + <includes> + <include>RELEASE_NOTES.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>target</directory> + <outputDirectory>qpid-${qpid.version}/lib</outputDirectory> + <includes> + <include>qpid-incubating.jar</include> + </includes> + </fileSet> + </fileSets> + <files> + <!-- due to a bug in the assembly plugin (MASSEMBLY-153) you have + to use decimal numbers to specify fileMode --> + <file> + <source>../common/etc/qpid-run.conf</source> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <destName>qpid-run.conf</destName> + <fileMode>420</fileMode> + </file> + <file> + <source>../broker/etc/config.xml</source> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <destName>config.xml</destName> + <fileMode>420</fileMode> + </file> + <file> + <source>../broker/etc/log4j.xml</source> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <destName>log4j.xml</destName> + <fileMode>420</fileMode> + </file> + <file> + <source>../broker/etc/passwd</source> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <destName>passwd</destName> + <fileMode>420</fileMode> + </file> + <file> + <source>../broker/etc/qpid-server.conf</source> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <destName>qpid-server.conf</destName> + <fileMode>420</fileMode> + </file> + <file> + <source>../broker/etc/virtualhosts.xml</source> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <destName>virtualhosts.xml</destName> + <fileMode>420</fileMode> + </file> + <file> + <source>../common/bin/qpid-run</source> + <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> + <destName>qpid-run</destName> + <fileMode>493</fileMode> + </file> + <file> + <source>../broker/bin/qpid-server</source> + <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> + <destName>qpid-server</destName> + <fileMode>493</fileMode> + </file> + <file> + <source>../broker/bin/qpid-server.bat</source> + <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> + <destName>qpid-server.bat</destName> + <fileMode>493</fileMode> + </file> + <file> + <source>../broker/bin/run.bat</source> + <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> + <destName>run.bat</destName> + <fileMode>493</fileMode> + </file> + <file> + <source>../broker/bin/run.sh</source> + <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> + <destName>run.sh</destName> + <fileMode>493</fileMode> + </file> + <file> + <source>../broker/bin/runAll</source> + <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> + <destName>runAll</destName> + <fileMode>493</fileMode> + </file> + </files> + <dependencySets> + <dependencySet> + <outputDirectory>qpid-${qpid.version}/lib</outputDirectory> + <unpack>false</unpack> + <excludes> + <exclude>org.apache.qpid:qpid-distribution</exclude> + <exclude>org.apache.qpid.management:org.apache.qpid.management.ui</exclude> + <exclude>org.eclipse.core:org.eclipse.core.commands</exclude> + <exclude>org.eclipse.core:org.eclipse.core.contenttype</exclude> + <exclude>org.eclipse.core:org.eclipse.core.expressions</exclude> + <exclude>org.eclipse.core:org.eclipse.core.jobs</exclude> + <exclude>org.eclipse.core:org.eclipse.core.runtime</exclude> + <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.auth</exclude> + <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</exclude> + <exclude>org.eclipse.equinox:org.eclipse.equinox.common</exclude> + <exclude>org.eclipse.equinox:org.eclipse.equinox.preferences</exclude> + <exclude>org.eclipse.equinox:org.eclipse.equinox.registry</exclude> + <exclude>org.eclipse.help:org.eclipse.help</exclude> + <exclude>org.eclipse.jface:org.eclipse.jface</exclude> + <exclude>org.eclipse.osgi:org.eclipse.osgi</exclude> + <exclude>org.eclipse.swt:org.eclipse.swt</exclude> + <exclude>org.eclipse.swt:org.eclipse.swt.win32.win32.x86</exclude> + <exclude>org.eclipse.ui:org.eclipse.ui</exclude> + <exclude>org.eclipse.ui:org.eclipse.ui.forms</exclude> + <exclude>org.eclipse.ui:org.eclipse.ui.workbench</exclude> + </excludes> + <scope>runtime</scope> + </dependencySet> + </dependencySets> +</assembly> diff --git a/java/distribution/src/main/assembly/client-bin.xml b/java/distribution/src/main/assembly/client-bin.xml new file mode 100644 index 0000000000..f89b1a39d2 --- /dev/null +++ b/java/distribution/src/main/assembly/client-bin.xml @@ -0,0 +1,108 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<assembly> + <id>java-client-bin</id> + <includeBaseDirectory>false</includeBaseDirectory> + <formats> + <format>tar.gz</format> + <format>zip</format> + </formats> + + <moduleSets> + <moduleSet> + <includes> + <include>org.apache.qpid:client</include> + </includes> + <binaries> + <includeDependencies>true</includeDependencies> + <unpack>false</unpack> + </binaries> + </moduleSet> + </moduleSets> + + <fileSets> + <fileSet> + <directory>src/main/release</directory> + <outputDirectory>qpid-${qpid.version}</outputDirectory> + <includes> + <include>DISCLAIMER</include> + </includes> + </fileSet> + <fileSet> + <directory>..</directory> + <outputDirectory>qpid-${qpid.version}</outputDirectory> + <includes> + <include>*.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/release/etc</directory> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <includes> + <include>logging.properties</include> + <include>log4j.properties</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/release/docs</directory> + <outputDirectory>qpid-${qpid.version}/docs</outputDirectory> + <includes> + <include>RELEASE_NOTES.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>target</directory> + <outputDirectory>qpid-${qpid.version}/lib</outputDirectory> + <includes> + <include>qpid-incubating.jar</include> + </includes> + </fileSet> + </fileSets> + + <dependencySets> + <dependencySet> + <outputDirectory>qpid-${qpid.version}/lib</outputDirectory> + <unpack>false</unpack> + <excludes> + <exclude>org.apache.qpid:qpid-distribution</exclude> + <exclude>org.apache.qpid.management:org.apache.qpid.management.ui</exclude> + <exclude>org.eclipse.core:org.eclipse.core.commands</exclude> + <exclude>org.eclipse.core:org.eclipse.core.contenttype</exclude> + <exclude>org.eclipse.core:org.eclipse.core.expressions</exclude> + <exclude>org.eclipse.core:org.eclipse.core.jobs</exclude> + <exclude>org.eclipse.core:org.eclipse.core.runtime</exclude> + <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.auth</exclude> + <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</exclude> + <exclude>org.eclipse.equinox:org.eclipse.equinox.common</exclude> + <exclude>org.eclipse.equinox:org.eclipse.equinox.preferences</exclude> + <exclude>org.eclipse.equinox:org.eclipse.equinox.registry</exclude> + <exclude>org.eclipse.help:org.eclipse.help</exclude> + <exclude>org.eclipse.jface:org.eclipse.jface</exclude> + <exclude>org.eclipse.osgi:org.eclipse.osgi</exclude> + <exclude>org.eclipse.swt:org.eclipse.swt</exclude> + <exclude>org.eclipse.swt:org.eclipse.swt.win32.win32.x86</exclude> + <exclude>org.eclipse.ui:org.eclipse.ui</exclude> + <exclude>org.eclipse.ui:org.eclipse.ui.forms</exclude> + <exclude>org.eclipse.ui:org.eclipse.ui.workbench</exclude> + </excludes> + <scope>runtime</scope> + </dependencySet> + </dependencySets> +</assembly> diff --git a/java/distribution/src/main/assembly/management-eclipse-plugin-unix.xml b/java/distribution/src/main/assembly/management-eclipse-plugin-unix.xml new file mode 100644 index 0000000000..5ac131b12b --- /dev/null +++ b/java/distribution/src/main/assembly/management-eclipse-plugin-unix.xml @@ -0,0 +1,112 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<assembly> + <!-- id typically identifies the "type" (src vs bin etc) of the assembly --> + <id>eclipse-plugin-unix</id> + <includeBaseDirectory>false</includeBaseDirectory> + <formats> + <format>tar.gz</format> + <format>zip</format> + </formats> +<!-- + <moduleSets> + <moduleSet> + <includes> + <include>org.apache.qpid.management:org.apache.qpid.management.ui</include> + </includes> + <binaries> + <includeDependencies>true</includeDependencies> + <unpack>false</unpack> + </binaries> + </moduleSet> + </moduleSets> + --> + <fileSets> + <fileSet> + <directory>src/main/release</directory> + <outputDirectory>qpidmc</outputDirectory> + <includes> + <include>DISCLAIMER</include> + </includes> + </fileSet> + <fileSet> + <directory>..</directory> + <outputDirectory>qpidmc</outputDirectory> + <includes> + <include>*.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/release/docs</directory> + <outputDirectory>qpidmc/docs</outputDirectory> + <includes> + <include>RELEASE_NOTES.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>../management/eclipse-plugin/src/main/resources/unix/configuration</directory> + <outputDirectory>qpidmc/configuration</outputDirectory> + <includes> + <include>**</include> + </includes> + </fileSet> + <fileSet> + <directory>../management/eclipse-plugin/src/main/resources</directory> + <outputDirectory>qpidmc</outputDirectory> + <includes> + <include>license.eclipse.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>../management/eclipse-plugin/bin</directory> + <outputDirectory>qpidmc/bin</outputDirectory> + <includes> + <include>qpidmc.sh</include> + </includes> + <fileMode>777</fileMode> + </fileSet> + </fileSets> + + <dependencySets> + <dependencySet> + <outputDirectory>qpidmc/eclipse/plugins/org.eclipse.core.runtime.compatibility.registry_3.2.0</outputDirectory> + <outputFileNameMapping>${artifactId}_${version}/</outputFileNameMapping> + <unpack>true</unpack> + <includes> + <include>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</include> + </includes> + <scope>runtime</scope> + </dependencySet> + + <dependencySet> + <outputDirectory>qpidmc/eclipse/plugins</outputDirectory> + <outputFileNameMapping>${artifactId}_${version}.${extension}</outputFileNameMapping> + <unpack>false</unpack> + <excludes> + <exclude>org.apache.qpid:qpid-distribution</exclude> + </excludes> + <includes> + <include>org.eclipse.ui:org.eclipse.ui.forms</include> + <include>org.apache.qpid.management:org.apache.qpid.management.ui</include> + </includes> + <scope>runtime</scope> + </dependencySet> +</dependencySets> +</assembly> diff --git a/java/distribution/src/main/assembly/management-eclipse-plugin.xml b/java/distribution/src/main/assembly/management-eclipse-plugin.xml new file mode 100644 index 0000000000..f6c2399785 --- /dev/null +++ b/java/distribution/src/main/assembly/management-eclipse-plugin.xml @@ -0,0 +1,142 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<assembly> + <!-- id typically identifies the "type" (src vs bin etc) of the assembly --> + <id>management-console-win32</id> + <includeBaseDirectory>false</includeBaseDirectory> + <formats> + <format>zip</format> + </formats> +<!-- + <moduleSets> + <moduleSet> + <includes> + <include>org.apache.qpid.management:org.apache.qpid.management.ui</include> + </includes> + <binaries> + <includeDependencies>true</includeDependencies> + <unpack>false</unpack> + </binaries> + </moduleSet> + </moduleSets> + --> + <fileSets> + <fileSet> + <directory>src/main/release</directory> + <outputDirectory>qpidmc</outputDirectory> + <includes> + <include>DISCLAIMER</include> + </includes> + </fileSet> + <fileSet> + <directory>..</directory> + <outputDirectory>qpidmc</outputDirectory> + <includes> + <include>*.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/release/docs</directory> + <outputDirectory>qpidmc/docs</outputDirectory> + <includes> + <include>RELEASE_NOTES.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>../management/eclipse-plugin/src/main/resources/win32/configuration</directory> + <outputDirectory>qpidmc/configuration</outputDirectory> + <includes> + <include>**</include> + </includes> + </fileSet> + <fileSet> + <directory>../management/eclipse-plugin/src/main/resources</directory> + <outputDirectory>qpidmc/eclipse</outputDirectory> + <includes> + <include>*.*</include> + </includes> + </fileSet> + <fileSet> + <directory>../management/eclipse-plugin/bin</directory> + <outputDirectory>qpidmc/bin</outputDirectory> + <includes> + <include>**</include> + </includes> + <fileMode>777</fileMode> + </fileSet> + </fileSets> + + <dependencySets> + <dependencySet> + <outputDirectory>qpidmc/eclipse/plugins</outputDirectory> + <outputFileNameMapping>${artifactId}_${version}.${extension}</outputFileNameMapping> + <unpack>false</unpack> + <excludes> + <exclude>org.apache.qpid:qpid-distribution</exclude> + <exclude>org.apache.qpid:qpid-common</exclude> + <exclude>org.apache.qpid:qpid-broker</exclude> + <exclude>org.apache.qpid:qpid-client</exclude> + <exclude>commons-cli:commons-cli</exclude> + <exclude>commons-configuration:commons-configuration</exclude> + <exclude>commons-lang:commons-lang</exclude> + <exclude>org.apache.mina:mina-filter-ssl</exclude> + <exclude>org.apache.mina:mina-java5</exclude> + <exclude>backport-util-concurrent:backport-util-concurrent</exclude> + <exclude>org.slf4j:slf4j-simple</exclude> + <exclude>junit:junit</exclude> + <exclude>org.easymock:easymockclassextension</exclude> + <exclude>commons-codec:commons-codec</exclude> + <exclude>org.apache.geronimo.specs:geronimo-jms_1.1_spec</exclude> + <exclude>commons-collections:commons-collections</exclude> + <exclude>commons-lang:commons-lang</exclude> + <exclude>org.apache.mina:mina-core</exclude> + <exclude>commons-beanutils:commons-beanutils</exclude> + <exclude>commons-beanutils:commons-beanutils-core</exclude> + <exclude>commons-digester:commons-digester</exclude> + <exclude>commons-logging:commons-logging</exclude> + <exclude>commons-logging:commons-logging-api</exclude> + <exclude>dom4j:dom4j</exclude> + <exclude>isorelax:isorelax</exclude> + <exclude>jaxen:jaxen</exclude> + <exclude>log4j:log4j</exclude> + <exclude>msv:msv</exclude> + <exclude>xalan:xalan</exclude> + <exclude>xml-apis:xml-apis</exclude> + <exclude>saxpath:saxpath</exclude> + <exclude>servletapi:servletapi</exclude> + <exclude>relaxngDatatype:relaxngDatatype</exclude> + <exclude>xerces:xercesImpl</exclude> + <exclude>javax.servlet:servlet-api</exclude> + <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</exclude> + </excludes> + <scope>runtime</scope> + </dependencySet> + <dependencySet> + <outputDirectory>qpidmc/eclipse/plugins/org.eclipse.core.runtime.compatibility.registry_3.2.0</outputDirectory> + <outputFileNameMapping>${artifactId}_${version}/</outputFileNameMapping> + <unpack>true</unpack> + <includes> + <include>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</include> + </includes> + <scope>runtime</scope> + </dependencySet> + </dependencySets> + +</assembly> diff --git a/java/distribution/src/main/assembly/src.xml b/java/distribution/src/main/assembly/src.xml new file mode 100644 index 0000000000..b66425c3d2 --- /dev/null +++ b/java/distribution/src/main/assembly/src.xml @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<assembly> + <!-- id typically identifies the "type" (src vs bin etc) of the assembly --> + <id>java-src</id> + <includeBaseDirectory>false</includeBaseDirectory> + <formats> + <format>tar.gz</format> + <format>zip</format> + </formats> + + <fileSets> + <fileSet> + <directory>src/main/release</directory> + <outputDirectory>qpid-${qpid.version}-src</outputDirectory> + <includes> + <include>DISCLAIMER</include> + <include>LICENSE</include> + <include>licenses/*.*</include> + <include>NOTICE</include> + <include>README</include> + <include>BUILDING.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>..</directory> + <outputDirectory>qpid-${qpid.version}-src</outputDirectory> + <includes> + <include>**/*</include> + </includes> + <excludes> + <exclude>build.xml</exclude> + <exclude>distribution/build.xml</exclude> + <exclude>benchmark</exclude> + <exclude>benchmark/**/*</exclude> + <exclude>**/target</exclude> + <exclude>**/target/**/*</exclude> + <exclude>**/build</exclude> + <exclude>**/build/**/*</exclude> + <exclude>**/.settings</exclude> + <exclude>**/.classpath</exclude> + <exclude>**/.project</exclude> + <exclude>**/.wtpmodules</exclude> + <exclude>**/surefire*</exclude> + <exclude>**/cobertura.ser</exclude> + <exclude>bin</exclude> + <exclude>bin/*</exclude> + <exclude>lib</exclude> + <exclude>lib/**/*</exclude> + <exclude>**/var/journal</exclude> + <exclude>**/build.out*</exclude> + <exclude>**/eclipse-plugin/bin/**</exclude> + <exclude>**/eclipse-plugin/plugins/**</exclude> + <exclude>**/eclipse-plugin/src/main/resources/**</exclude> + </excludes> + </fileSet> + </fileSets> +</assembly> diff --git a/java/distribution/src/main/release/DISCLAIMER b/java/distribution/src/main/release/DISCLAIMER new file mode 100644 index 0000000000..c9a0ddf8f9 --- /dev/null +++ b/java/distribution/src/main/release/DISCLAIMER @@ -0,0 +1,5 @@ +Apache Qpid is an effort undergoing incubation at the Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC.
+
+Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects.
+
+While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.
diff --git a/java/pom.xml b/java/pom.xml index dd5280cfde..b45c14b91b 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -87,6 +87,7 @@ <eclipse.plugin.version>2.2</eclipse.plugin.version> <jar.version>2.0</jar.version> <javadoc.version>2.0</javadoc.version> + <junit.version>3.8.1</junit.version> <jxr.version>2.0</jxr.version> <mprojectinfo.version>2.0</mprojectinfo.version> <resources.version>2.2</resources.version> @@ -94,9 +95,10 @@ <surefire-report.version>2.1-SNAPSHOT</surefire-report.version> <surefire.version>2.2</surefire.version> - <amqj.logging.level>debug</amqj.logging.level> + <amqj.logging.level>warn</amqj.logging.level> <eclipse.workspace.dir>${basedir}/${topDirectoryLocation}/../workspace</eclipse.workspace.dir> + <clover.license.pathname>/set/clover/license/path/here</clover.license.pathname> </properties> <modules> @@ -160,6 +162,7 @@ <pluginManagement> <plugins> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> @@ -172,6 +175,52 @@ </dependency> </dependencies> + + <executions> + + <!-- This Ant task writes the module name, version and the Subversion version information out to a properties file. + The svnversion command must be available to run from the command line for this to work. The build will not fail if + svnversion cannot be run though. + This is done during the 'compile' phase to reflect the version of the currently compiled code and to ensure that + these properties are up to date when running from a file system classpath. Consider moving this to, or running a second + time, during the 'package' phase to capture the version of any resources added to jar files. + This svnversion command is always run in the top directory to accurately reflect the svnversion range accross all modules + at the time of the build. + The properties are placed into a file 'version.properties' in the target/classes directory of any child module that runs + this plugin. + The 'version.properties' file is loaded by the org.apache.qpid.common.QpidProperties class. + Be carefull of the possibility that the 'common' module may run this antrun plugin and recieve its own set of + version.properties and then the client or broker being built against an older version of the common library ending up with + the wrong version information. This is unlikely to happen because the client or broker should pick up its own properties + from the classpath first. If this happens it will be obvious because the productName property will be + 'Qpid Common Utilities'. If this is a problem then push this ant task down into the client and broker poms and remove it + from here. + --> + <execution> + <id>version_properties</id> + <phase>compile</phase> + <configuration> + <tasks> + + <exec executable="svnversion" spawn="false" failifexecutionfails="false" + dir="${topDirectoryLocation}" outputproperty="svnversion"> + <arg line="."/> + </exec> + + <!-- Write the version.properties out. --> + <propertyfile file="target/classes/qpidversion.properties"> + <entry key="qpid.svnversion" value="${svnversion}"/> + <entry key="qpid.name" value="${project.name}"/> + <entry key="qpid.version" value="${project.version}"/> + </propertyfile> + + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -241,9 +290,25 @@ <downloadJavadocs>true</downloadJavadocs> </configuration> </plugin> - + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-clover-plugin</artifactId> + <version>2.3</version> + <configuration> + <licenseLocation>${clover.license.pathname}</licenseLocation> + <jdk>${java.source.version}</jdk> + </configuration> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>instrument</goal> + <goal>aggregate</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> - </pluginManagement> <defaultGoal>install</defaultGoal> </build> @@ -335,7 +400,7 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>3.8.1</version> + <version>${junit.version}</version> <scope>test</scope> </dependency> <dependency> @@ -399,6 +464,10 @@ <artifactId>maven-javadoc-plugin</artifactId> <version>${javadoc.version}</version> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-clover-plugin</artifactId> + </plugin> </plugins> </reporting> diff --git a/java/systests/pom.xml b/java/systests/pom.xml index 93c8a2333b..c73e5f2c44 100644 --- a/java/systests/pom.xml +++ b/java/systests/pom.xml @@ -34,7 +34,6 @@ <properties> <topDirectoryLocation>..</topDirectoryLocation> - <amqj.logging.level>warn</amqj.logging.level> </properties> <dependencies> diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index a0765f6924..258bcecc41 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -95,7 +95,8 @@ public class TxAckTest extends TestCase Scenario(int messageCount, List<Long> acked, List<Long> unacked) { TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), null, - new LinkedList<RequiredDeliveryException>()); + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); for(int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 991a098678..5909ac048b 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -202,7 +202,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase private static MessageStore _messageStore = new SkeletonMessageStore(); private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null, - new LinkedList<RequiredDeliveryException>()); + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); Message(String id, String... headers) throws AMQException { diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java new file mode 100644 index 0000000000..c8a87a0a0e --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java @@ -0,0 +1,124 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import java.util.ArrayList; + +/** + * Unit test class for testing different Exchange MBean operations + */ +public class ExchangeMBeanTest extends TestCase +{ + private AMQQueue _queue; + private QueueRegistry _queueRegistry; + + /** + * Test for direct exchange mbean + * @throws Exception + */ + public void testDirectExchangeMBean() throws Exception + { + DestNameExchange exchange = new DestNameExchange(); + exchange.initialise("amq.direct", false, 0, true); + ManagedObject managedObj = exchange.getManagedObject(); + ManagedExchange mbean = (ManagedExchange)managedObj; + + mbean.createNewBinding(_queue.getName(), "binding1"); + mbean.createNewBinding(_queue.getName(), "binding2"); + + TabularData data = mbean.bindings(); + ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values()); + assertTrue(list.size() == 2); + + // test general exchange properties + assertEquals(mbean.getName(), "amq.direct"); + assertEquals(mbean.getExchangeType(), "direct"); + assertTrue(mbean.getTicketNo() == 0); + assertTrue(!mbean.isDurable()); + assertTrue(mbean.isAutoDelete()); + } + + /** + * Test for "topic" exchange mbean + * @throws Exception + */ + public void testTopicExchangeMBean() throws Exception + { + DestWildExchange exchange = new DestWildExchange(); + exchange.initialise("amq.topic", false, 0, true); + ManagedObject managedObj = exchange.getManagedObject(); + ManagedExchange mbean = (ManagedExchange)managedObj; + + mbean.createNewBinding(_queue.getName(), "binding1"); + mbean.createNewBinding(_queue.getName(), "binding2"); + + TabularData data = mbean.bindings(); + ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values()); + assertTrue(list.size() == 2); + + // test general exchange properties + assertEquals(mbean.getName(), "amq.topic"); + assertEquals(mbean.getExchangeType(), "topic"); + assertTrue(mbean.getTicketNo() == 0); + assertTrue(!mbean.isDurable()); + assertTrue(mbean.isAutoDelete()); + } + + /** + * Test for "Headers" exchange mbean + * @throws Exception + */ + public void testHeadersExchangeMBean() throws Exception + { + HeadersExchange exchange = new HeadersExchange(); + exchange.initialise("amq.headers", false, 0, true); + ManagedObject managedObj = exchange.getManagedObject(); + ManagedExchange mbean = (ManagedExchange)managedObj; + + mbean.createNewBinding(_queue.getName(), "key1=binding1,key2=binding2"); + mbean.createNewBinding(_queue.getName(), "key3=binding3"); + + TabularData data = mbean.bindings(); + ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values()); + assertTrue(list.size() == 2); + + // test general exchange properties + assertEquals(mbean.getName(), "amq.headers"); + assertEquals(mbean.getExchangeType(), "headers"); + assertTrue(mbean.getTicketNo() == 0); + assertTrue(!mbean.isDurable()); + assertTrue(mbean.isAutoDelete()); + } + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry(); + _queue = new AMQQueue("testQueue", false, "ExchangeMBeanTest", false, _queueRegistry); + _queueRegistry.registerQueue(_queue); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index ec6a82cc29..b125bc1d4c 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -6,6 +6,7 @@ import org.apache.qpid.test.VMBrokerSetup; import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.client.*;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -38,12 +39,14 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex protected void setUp() throws Exception
{
super.setUp();
+ TransportConnection.createVMBroker(1);
ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
}
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killAllVMBrokers();
}
/**
@@ -56,13 +59,14 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex _bouncedMessageList.clear();
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+ AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
FieldTable ft = new PropertyFieldTable();
- ft.setString("F1000","1");
- MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
-
+ ft.setString("F1000", "1");
+ MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
@@ -76,49 +80,45 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex con2.start();
- MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false);
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(queue, false, false);
MessageProducer mandatoryProducer = producerSession.createProducer(queue);
-
// First test - should neither be bounced nor routed
_logger.info("Sending non-routable non-mandatory message");
- TextMessage msg1 = producerSession.createTextMessage("msg1");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
nonMandatoryProducer.send(msg1);
// Second test - should be bounced
_logger.info("Sending non-routable mandatory message");
- TextMessage msg2 = producerSession.createTextMessage("msg2");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
mandatoryProducer.send(msg2);
// Third test - should be routed
_logger.info("Sending routable message");
- TextMessage msg3 = producerSession.createTextMessage("msg3");
- msg3.setStringProperty("F1000","1");
+ TextMessage msg3 = producerSession.createTextMessage("msg3");
+ msg3.setStringProperty("F1000", "1");
mandatoryProducer.send(msg3);
-
_logger.info("Starting consumer connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive(1000L);
- assertTrue("No message routed to receiver",tm != null);
- assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText()));
+ assertTrue("No message routed to receiver", tm != null);
+ assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg3".equals(tm.getText()));
try
{
Thread.sleep(1000L);
}
- catch(InterruptedException e)
+ catch (InterruptedException e)
{
;
}
- assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1);
+ assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
Message m = _bouncedMessageList.get(0);
- assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2"));
-
-
+ assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
con.close();
@@ -129,18 +129,23 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class));
+ return new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class);
}
public void onException(JMSException jmsException)
{
- _logger.warn("Caught exception on producer: ",jmsException);
+
Exception linkedException = jmsException.getLinkedException();
- if(linkedException instanceof AMQNoRouteException)
+ if (linkedException instanceof AMQNoRouteException)
{
AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
Message bounced = (Message) noRoute.getUndeliveredMessage();
_bouncedMessageList.add(bounced);
+ _logger.info("Caught expected NoRouteException");
+ }
+ else
+ {
+ _logger.warn("Caught exception on producer: ", jmsException);
}
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java new file mode 100644 index 0000000000..c2ac099855 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -0,0 +1,110 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.protocol; + +import junit.framework.TestCase; +import org.apache.mina.common.IoSession; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.AMQException; + +import javax.management.JMException; + +/** + * Test class to test MBean operations for AMQMinaProtocolSession. + */ +public class AMQProtocolSessionMBeanTest extends TestCase +{ + private IoSession _mockIOSession; + private MessageStore _messageStore = new SkeletonMessageStore(); + private AMQMinaProtocolSession _protocolSession; + private AMQChannel _channel; + private QueueRegistry _queueRegistry; + private ExchangeRegistry _exchangeRegistry; + private AMQProtocolSessionMBean _mbean; + + public void testChannels() throws Exception + { + // check the channel count is correct + int channelCount = _mbean.channels().size(); + assertTrue(channelCount == 1); + _protocolSession.addChannel(new AMQChannel(2, _messageStore, null)); + channelCount = _mbean.channels().size(); + assertTrue(channelCount == 2); + + // general properties test + _mbean.setMaximumNumberOfChannels(1000L); + assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L); + + // check APIs + AMQChannel channel3 = new AMQChannel(3, _messageStore, null); + channel3.setLocalTransactional(); + _protocolSession.addChannel(channel3); + _mbean.rollbackTransactions(2); + _mbean.rollbackTransactions(3); + _mbean.commitTransactions(2); + _mbean.commitTransactions(3); + + // This should throw exception, because the channel does't exist + try + { + _mbean.commitTransactions(4); + fail(); + } + catch (JMException ex) + { + System.out.println("expected exception is thrown :" + ex.getMessage()); + } + + // check if closing of session works + _protocolSession.addChannel(new AMQChannel(5, _messageStore, null)); + _mbean.closeConnection(); + try + { + channelCount = _mbean.channels().size(); + assertTrue(channelCount == 0); + // session is now closed so adding another channel should throw an exception + _protocolSession.addChannel(new AMQChannel(6, _messageStore, null)); + fail(); + } + catch(AMQException ex) + { + System.out.println("expected exception is thrown :" + ex.getMessage()); + } + } + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _channel = new AMQChannel(1, _messageStore, null); + _queueRegistry = new DefaultQueueRegistry(); + _exchangeRegistry = new DefaultExchangeRegistry(new DefaultExchangeFactory()); + _mockIOSession = new MockIoSession(); + _protocolSession = new AMQMinaProtocolSession(_mockIOSession, _queueRegistry, _exchangeRegistry, new AMQCodecFactory(true)); + _protocolSession.addChannel(_channel); + _mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject(); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java index 81dea32a76..cf6366b513 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java +++ b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java @@ -25,6 +25,7 @@ import org.apache.mina.common.support.DefaultCloseFuture; import org.apache.mina.common.support.DefaultWriteFuture; import java.net.SocketAddress; +import java.net.InetSocketAddress; import java.util.Set; public class MockIoSession implements IoSession @@ -151,7 +152,7 @@ public class MockIoSession implements IoSession public SocketAddress getRemoteAddress() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates. } public SocketAddress getLocalAddress() diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 4fcc691a2f..562452d729 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -30,18 +30,20 @@ import org.apache.qpid.server.store.SkeletonMessageStore; import javax.management.JMException; import java.util.LinkedList; +import java.util.HashSet; /** * Test class to test AMQQueueMBean attribtues and operations */ -public class AMQQueueMBeanTest extends TestCase +public class AMQQueueMBeanTest extends TestCase { private AMQQueue _queue; private AMQQueueMBean _queueMBean; private QueueRegistry _queueRegistry; private MessageStore _messageStore = new SkeletonMessageStore(); private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, null, - new LinkedList<RequiredDeliveryException>()); + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); private MockProtocolSession _protocolSession; private AMQChannel _channel; @@ -68,14 +70,14 @@ public class AMQQueueMBeanTest extends TestCase assertFalse(mgr.hasActiveSubscribers()); assertTrue(_queueMBean.getActiveConsumerCount() == 0); - _channel = new AMQChannel(1, _messageStore, null); + _channel = new AMQChannel(1, _messageStore, null); _protocolSession = new MockProtocolSession(_messageStore); _protocolSession.addChannel(_channel); - _queue.registerProtocolSession(_protocolSession, 1, "test", false); + _queue.registerProtocolSession(_protocolSession, 1, "test", false, null); assertTrue(_queueMBean.getActiveConsumerCount() == 1); - SubscriptionSet _subscribers = (SubscriptionSet)mgr; + SubscriptionSet _subscribers = (SubscriptionSet) mgr; SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1"); SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2"); _subscribers.addSubscriber(s1); @@ -167,7 +169,7 @@ public class AMQQueueMBeanTest extends TestCase super.setUp(); _queueRegistry = new DefaultQueueRegistry(); _queue = new AMQQueue("testQueue", false, "AMQueueMBeanTest", false, _queueRegistry); - _queueMBean = new AMQQueueMBean(_queue); + _queueMBean = new AMQQueueMBean(_queue); } private void sendMessages(int messageCount) throws AMQException @@ -175,7 +177,8 @@ public class AMQQueueMBeanTest extends TestCase AMQMessage[] messages = new AMQMessage[messageCount]; for (int i = 0; i < messages.length; i++) { - messages[i] = message(false);; + messages[i] = message(false); + ; } for (int i = 0; i < messageCount; i++) { diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java index 1911d38cd2..d4ea728e95 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.util.TestApplicationRegistry; import java.util.LinkedList; import java.util.Set; +import java.util.HashSet; /** * Tests that acknowledgements are handled correctly. @@ -82,7 +83,8 @@ public class AckTest extends TestCase private void publishMessages(int count, boolean persistent) throws AMQException { TransactionalContext txnContext = new NonTransactionalContext(_messageStore, null, - new LinkedList<RequiredDeliveryException>()); + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); MessageHandleFactory factory = new MessageHandleFactory(); for (int i = 1; i <= count; i++) { diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java index 8cf84e0dcf..7843d8a182 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -34,13 +34,15 @@ import org.apache.qpid.AMQException; import junit.framework.TestCase; import java.util.LinkedList; +import java.util.HashSet; class MessageTestHelper extends TestCase { private final MessageStore _messageStore = new SkeletonMessageStore(); private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null, - new LinkedList<RequiredDeliveryException>()); + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); MessageTestHelper() throws Exception { diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index bb8fd5bc19..87e5c43932 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; @@ -121,4 +122,13 @@ public class MockProtocolSession implements AMQProtocolSession public void setSaslServer(SaslServer saslServer) { } + + public FieldTable getClientProperties() + { + return null; + } + + public void setClientProperties(FieldTable clientProperties) + { + } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 2773c810d2..fea3c93280 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import java.util.ArrayList; import java.util.List; +import java.util.Queue; public class SubscriptionTestHelper implements Subscription { @@ -70,6 +71,41 @@ public class SubscriptionTestHelper implements Subscription { } + public boolean hasFilters() + { + return false; + } + + public boolean hasInterest(AMQMessage msg) + { + return true; + } + + public Queue<AMQMessage> getPreDeliveryQueue() + { + return null; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + //no-op + } + + public boolean isAutoClose() + { + return false; + } + + public void close() + { + //no-op + } + + public boolean isBrowser() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public int hashCode() { return key.hashCode(); diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 97c9becf18..a40a9bf12f 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -49,7 +49,7 @@ public class TestReferenceCounting extends TestCase { createPersistentContentHeader(); AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(), - new NonTransactionalContext(_store, null, null), + new NonTransactionalContext(_store, null, null, null), createPersistentContentHeader()); message.incrementReference(); // we call routing complete to set up the handle @@ -71,7 +71,7 @@ public class TestReferenceCounting extends TestCase public void testMessageRemains() throws AMQException { AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(), - new NonTransactionalContext(_store, null, null), + new NonTransactionalContext(_store, null, null, null), createPersistentContentHeader()); message.incrementReference(); // we call routing complete to set up the handle |