summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-26 21:04:28 +0000
committerRobert Greig <rgreig@apache.org>2006-12-26 21:04:28 +0000
commit8727d212214ddcfe4a1f945ab14a0cd8d59b9837 (patch)
tree9ad3f470957eb36921ab4a8c354343b160b22f8e
parent0d5658dfb5a210a6987ef75e2d1813944ab79c4c (diff)
downloadqpid-python-8727d212214ddcfe4a1f945ab14a0cd8d59b9837.tar.gz
Merge of trunk up to rev 489403
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@490372 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/config.xml4
-rw-r--r--java/broker/etc/qpid-server.conf.jpp49
-rw-r--r--java/broker/pom.xml28
-rw-r--r--java/broker/src/main/grammar/SelectorParser.jj598
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java219
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java100
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java467
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java201
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java37
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java86
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java79
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java96
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java46
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java305
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java77
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java265
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java132
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java109
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java307
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java53
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java388
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java236
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java64
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java9
-rw-r--r--java/client/pom.xml6
-rw-r--r--java/client/src/log4j.properties4
-rw-r--r--java/client/src/main/java/log4j.properties4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java128
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java173
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java105
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java35
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java41
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java1
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java141
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java7
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java25
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java50
-rw-r--r--java/common/src/main/java/log4j.properties28
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java40
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/ClientProperties.java29
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/QpidProperties.java118
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java4
-rw-r--r--java/distribution/pom.xml142
-rw-r--r--java/distribution/src/main/assembly/bin.xml172
-rw-r--r--java/distribution/src/main/assembly/client-bin.xml108
-rw-r--r--java/distribution/src/main/assembly/management-eclipse-plugin-unix.xml112
-rw-r--r--java/distribution/src/main/assembly/management-eclipse-plugin.xml142
-rw-r--r--java/distribution/src/main/assembly/src.xml75
-rw-r--r--java/distribution/src/main/release/DISCLAIMER5
-rw-r--r--java/pom.xml77
-rw-r--r--java/systests/pom.xml1
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java3
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java3
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java124
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java47
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java110
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java3
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java17
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java4
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java4
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java10
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java36
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java4
99 files changed, 6486 insertions, 272 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index 0862588a0d..40e2f468e0 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -82,8 +82,8 @@
<auto_register>true</auto_register>
</queue>
<store>
- <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>-->
- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>-->
</store>
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
diff --git a/java/broker/etc/qpid-server.conf.jpp b/java/broker/etc/qpid-server.conf.jpp
new file mode 100644
index 0000000000..3ed2431ef3
--- /dev/null
+++ b/java/broker/etc/qpid-server.conf.jpp
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+QPID_LIBS=$(build-classpath backport-util-concurrent \
+ commons-beanutils \
+ commons-beanutils-core \
+ commons-cli \
+ commons-codec \
+ commons-collections \
+ commons-configuration \
+ commons-digester \
+ commons-lang \
+ commons-logging \
+ commons-logging-api \
+ dom4j \
+ geronimo-jms-1.1-api \
+ isorelax \
+ jaxen \
+ log4j \
+ mina/core \
+ mina/filter-ssl \
+ mina/java5 \
+ msv-msv \
+ qpid-broker \
+ qpid-client \
+ qpid-common \
+ relaxngDatatype \
+ slf4j)
+
+export JAVA=java \
+ JAVA_VM=-server \
+ JAVA_MEM=-Xmx1024m \
+ CLASSPATH=$QPID_LIBS
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index aea2d5878a..92a3d69060 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -34,7 +34,6 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
- <amqj.logging.level>warn</amqj.logging.level>
</properties>
<dependencies>
@@ -55,6 +54,10 @@
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-filter-ssl</artifactId>
</dependency>
@@ -84,6 +87,29 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.0</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <configuration>
+ <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+ <outputDirectory>${basedir}/target/generated-sources</outputDirectory>
+ <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+ </configuration>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemProperties>
diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj
new file mode 100644
index 0000000000..5553a46e47
--- /dev/null
+++ b/java/broker/src/main/grammar/SelectorParser.jj
@@ -0,0 +1,598 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+ //
+ // Original File from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+ //
+
+// ----------------------------------------------------------------------------
+// OPTIONS
+// ----------------------------------------------------------------------------
+options {
+ STATIC = false;
+ UNICODE_INPUT = true;
+
+ // some performance optimizations
+ OPTIMIZE_TOKEN_MANAGER = true;
+ ERROR_REPORTING = false;
+}
+
+// ----------------------------------------------------------------------------
+// PARSER
+// ----------------------------------------------------------------------------
+
+PARSER_BEGIN(SelectorParser)
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.filter.jms.selector;
+
+import java.io.*;
+import java.util.*;
+
+import javax.jms.InvalidSelectorException;
+
+import org.apache.qpid.server.filter.*;
+
+/**
+ * JMS Selector Parser generated by JavaCC
+ *
+ * Do not edit this .java file directly - it is autogenerated from SelectorParser.jj
+ */
+public class SelectorParser {
+
+ public SelectorParser() {
+ this(new StringReader(""));
+ }
+
+ public BooleanExpression parse(String sql) throws InvalidSelectorException {
+ this.ReInit(new StringReader(sql));
+
+ try {
+ return this.JmsSelector();
+ }
+ catch (Throwable e) {
+ throw (InvalidSelectorException)new InvalidSelectorException(sql).initCause(e);
+ }
+
+ }
+
+ private BooleanExpression asBooleanExpression(Expression value) throws ParseException {
+ if (value instanceof BooleanExpression) {
+ return (BooleanExpression) value;
+ }
+ if (value instanceof PropertyExpression) {
+ return UnaryExpression.createBooleanCast( value );
+ }
+ throw new ParseException("Expression will not result in a boolean value: " + value);
+ }
+
+
+}
+
+PARSER_END(SelectorParser)
+
+// ----------------------------------------------------------------------------
+// Tokens
+// ----------------------------------------------------------------------------
+
+/* White Space */
+SPECIAL_TOKEN :
+{
+ " " | "\t" | "\n" | "\r" | "\f"
+}
+
+/* Comments */
+SKIP:
+{
+ <LINE_COMMENT: "--" (~["\n","\r"])* ("\n"|"\r"|"\r\n") >
+}
+
+SKIP:
+{
+ <BLOCK_COMMENT: "/*" (~["*"])* "*" ("*" | (~["*","/"] (~["*"])* "*"))* "/">
+}
+
+/* Reserved Words */
+TOKEN [IGNORE_CASE] :
+{
+ < NOT : "NOT">
+ | < AND : "AND">
+ | < OR : "OR">
+ | < BETWEEN : "BETWEEN">
+ | < LIKE : "LIKE">
+ | < ESCAPE : "ESCAPE">
+ | < IN : "IN">
+ | < IS : "IS">
+ | < TRUE : "TRUE" >
+ | < FALSE : "FALSE" >
+ | < NULL : "NULL" >
+ | < XPATH : "XPATH" >
+ | < XQUERY : "XQUERY" >
+}
+
+/* Literals */
+TOKEN [IGNORE_CASE] :
+{
+
+ < DECIMAL_LITERAL: ["1"-"9"] (["0"-"9"])* (["l","L"])? >
+ | < HEX_LITERAL: "0" ["x","X"] (["0"-"9","a"-"f","A"-"F"])+ >
+ | < OCTAL_LITERAL: "0" (["0"-"7"])* >
+ | < FLOATING_POINT_LITERAL:
+ (["0"-"9"])+ "." (["0"-"9"])* (<EXPONENT>)? // matches: 5.5 or 5. or 5.5E10 or 5.E10
+ | "." (["0"-"9"])+ (<EXPONENT>)? // matches: .5 or .5E10
+ | (["0"-"9"])+ <EXPONENT> // matches: 5E10
+ >
+ | < #EXPONENT: "E" (["+","-"])? (["0"-"9"])+ >
+ | < STRING_LITERAL: "'" ( ("''") | ~["'"] )* "'" >
+}
+
+TOKEN [IGNORE_CASE] :
+{
+ < ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* >
+}
+
+// ----------------------------------------------------------------------------
+// Grammer
+// ----------------------------------------------------------------------------
+BooleanExpression JmsSelector() :
+{
+ Expression left=null;
+}
+{
+ (
+ left = orExpression()
+ )
+ {
+ return asBooleanExpression(left);
+ }
+
+}
+
+Expression orExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = andExpression()
+ (
+ <OR> right = andExpression()
+ {
+ left = LogicExpression.createOR(asBooleanExpression(left), asBooleanExpression(right));
+ }
+ )*
+ )
+ {
+ return left;
+ }
+
+}
+
+
+Expression andExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = equalityExpression()
+ (
+ <AND> right = equalityExpression()
+ {
+ left = LogicExpression.createAND(asBooleanExpression(left), asBooleanExpression(right));
+ }
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression equalityExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = comparisonExpression()
+ (
+
+ "=" right = comparisonExpression()
+ {
+ left = ComparisonExpression.createEqual(left, right);
+ }
+ |
+ "<>" right = comparisonExpression()
+ {
+ left = ComparisonExpression.createNotEqual(left, right);
+ }
+ |
+ LOOKAHEAD(2)
+ <IS> <NULL>
+ {
+ left = ComparisonExpression.createIsNull(left);
+ }
+ |
+ <IS> <NOT> <NULL>
+ {
+ left = ComparisonExpression.createIsNotNull(left);
+ }
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression comparisonExpression() :
+{
+ Expression left;
+ Expression right;
+ Expression low;
+ Expression high;
+ String t, u;
+ boolean not;
+ ArrayList list;
+}
+{
+ (
+ left = addExpression()
+ (
+
+ ">" right = addExpression()
+ {
+ left = ComparisonExpression.createGreaterThan(left, right);
+ }
+ |
+ ">=" right = addExpression()
+ {
+ left = ComparisonExpression.createGreaterThanEqual(left, right);
+ }
+ |
+ "<" right = addExpression()
+ {
+ left = ComparisonExpression.createLessThan(left, right);
+ }
+ |
+ "<=" right = addExpression()
+ {
+ left = ComparisonExpression.createLessThanEqual(left, right);
+ }
+ |
+ {
+ u=null;
+ }
+ <LIKE> t = stringLitteral()
+ [ <ESCAPE> u = stringLitteral() ]
+ {
+ left = ComparisonExpression.createLike(left, t, u);
+ }
+ |
+ LOOKAHEAD(2)
+ {
+ u=null;
+ }
+ <NOT> <LIKE> t = stringLitteral() [ <ESCAPE> u = stringLitteral() ]
+ {
+ left = ComparisonExpression.createNotLike(left, t, u);
+ }
+ |
+ <BETWEEN> low = addExpression() <AND> high = addExpression()
+ {
+ left = ComparisonExpression.createBetween(left, low, high);
+ }
+ |
+ LOOKAHEAD(2)
+ <NOT> <BETWEEN> low = addExpression() <AND> high = addExpression()
+ {
+ left = ComparisonExpression.createNotBetween(left, low, high);
+ }
+ |
+ <IN>
+ "("
+ t = stringLitteral()
+ {
+ list = new ArrayList();
+ list.add( t );
+ }
+ (
+ ","
+ t = stringLitteral()
+ {
+ list.add( t );
+ }
+
+ )*
+ ")"
+ {
+ left = ComparisonExpression.createInFilter(left, list);
+ }
+ |
+ LOOKAHEAD(2)
+ <NOT> <IN>
+ "("
+ t = stringLitteral()
+ {
+ list = new ArrayList();
+ list.add( t );
+ }
+ (
+ ","
+ t = stringLitteral()
+ {
+ list.add( t );
+ }
+
+ )*
+ ")"
+ {
+ left = ComparisonExpression.createNotInFilter(left, list);
+ }
+
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression addExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ left = multExpr()
+ (
+ LOOKAHEAD( ("+"|"-") multExpr())
+ (
+ "+" right = multExpr()
+ {
+ left = ArithmeticExpression.createPlus(left, right);
+ }
+ |
+ "-" right = multExpr()
+ {
+ left = ArithmeticExpression.createMinus(left, right);
+ }
+ )
+
+ )*
+ {
+ return left;
+ }
+}
+
+Expression multExpr() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ left = unaryExpr()
+ (
+ "*" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createMultiply(left, right);
+ }
+ |
+ "/" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createDivide(left, right);
+ }
+ |
+ "%" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createMod(left, right);
+ }
+
+ )*
+ {
+ return left;
+ }
+}
+
+
+Expression unaryExpr() :
+{
+ String s=null;
+ Expression left=null;
+}
+{
+ (
+ LOOKAHEAD( "+" unaryExpr() )
+ "+" left=unaryExpr()
+ |
+ "-" left=unaryExpr()
+ {
+ left = UnaryExpression.createNegate(left);
+ }
+ |
+ <NOT> left=unaryExpr()
+ {
+ left = UnaryExpression.createNOT( asBooleanExpression(left) );
+ }
+ |
+ <XPATH> s=stringLitteral()
+ {
+ left = UnaryExpression.createXPath( s );
+ }
+ |
+ <XQUERY> s=stringLitteral()
+ {
+ left = UnaryExpression.createXQuery( s );
+ }
+ |
+ left = primaryExpr()
+ )
+ {
+ return left;
+ }
+
+}
+
+Expression primaryExpr() :
+{
+ Expression left=null;
+}
+{
+ (
+ left = literal()
+ |
+ left = variable()
+ |
+ "(" left = orExpression() ")"
+ )
+ {
+ return left;
+ }
+}
+
+
+
+ConstantExpression literal() :
+{
+ Token t;
+ String s;
+ ConstantExpression left=null;
+}
+{
+ (
+ (
+ s = stringLitteral()
+ {
+ left = new ConstantExpression(s);
+ }
+ )
+ |
+ (
+ t = <DECIMAL_LITERAL>
+ {
+ left = ConstantExpression.createFromDecimal(t.image);
+ }
+ )
+ |
+ (
+ t = <HEX_LITERAL>
+ {
+ left = ConstantExpression.createFromHex(t.image);
+ }
+ )
+ |
+ (
+ t = <OCTAL_LITERAL>
+ {
+ left = ConstantExpression.createFromOctal(t.image);
+ }
+ )
+ |
+ (
+ t = <FLOATING_POINT_LITERAL>
+ {
+ left = ConstantExpression.createFloat(t.image);
+ }
+ )
+ |
+ (
+ <TRUE>
+ {
+ left = ConstantExpression.TRUE;
+ }
+ )
+ |
+ (
+ <FALSE>
+ {
+ left = ConstantExpression.FALSE;
+ }
+ )
+ |
+ (
+ <NULL>
+ {
+ left = ConstantExpression.NULL;
+ }
+ )
+ )
+ {
+ return left;
+ }
+}
+
+String stringLitteral() :
+{
+ Token t;
+ StringBuffer rc = new StringBuffer();
+ boolean first=true;
+}
+{
+ t = <STRING_LITERAL>
+ {
+ // Decode the sting value.
+ String image = t.image;
+ for( int i=1; i < image.length()-1; i++ ) {
+ char c = image.charAt(i);
+ if( c == '\'' )
+ i++;
+ rc.append(c);
+ }
+ return rc.toString();
+ }
+}
+
+PropertyExpression variable() :
+{
+ Token t;
+ PropertyExpression left=null;
+}
+{
+ (
+ t = <ID>
+ {
+ left = new PropertyExpression(t.image);
+ }
+ )
+ {
+ return left;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 63cc57b05e..24f61b2426 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -41,6 +43,8 @@ import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.TxnBuffer;
import java.util.*;
+import java.util.Set;
+import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -102,6 +106,8 @@ public class AMQChannel
private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
+ private Set<Long> _browsedAcks = new HashSet<Long>();
+
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
@@ -111,7 +117,7 @@ public class AMQChannel
_messageStore = messageStore;
_exchanges = exchanges;
// by default the session is non-transactional
- _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages);
+ _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages, _browsedAcks);
}
/**
@@ -311,13 +317,14 @@ public class AMQChannel
* @param tag the tag chosen by the client (if null, server will generate one)
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
+ * @param noLocal
* @return the consumer tag. This is returned to the subscriber and used in
* subsequent unsubscribe requests
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks)
- throws AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
+ FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -328,7 +335,7 @@ public class AMQChannel
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
@@ -524,6 +531,12 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
+ public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
+ {
+ _browsedAcks.add(deliveryTag);
+ addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
private void checkSuspension()
{
boolean suspend;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
new file mode 100644
index 0000000000..c536f77dde
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
@@ -0,0 +1,219 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+
+/**
+ * An expression which performs an operation on two expression values
+ *
+ * @version $Revision$
+ */
+public abstract class ArithmeticExpression extends BinaryExpression {
+
+ protected static final int INTEGER = 1;
+ protected static final int LONG = 2;
+ protected static final int DOUBLE = 3;
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ArithmeticExpression(Expression left, Expression right) {
+ super(left, right);
+ }
+
+ public static Expression createPlus(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof String) {
+ String text = (String) lvalue;
+ String answer = text + rvalue;
+ return answer;
+ }
+ else if (lvalue instanceof Number) {
+ return plus((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "+";
+ }
+ };
+ }
+
+ public static Expression createMinus(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return minus((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "-";
+ }
+ };
+ }
+
+ public static Expression createMultiply(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return multiply((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "*";
+ }
+ };
+ }
+
+ public static Expression createDivide(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return divide((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "/";
+ }
+ };
+ }
+
+ public static Expression createMod(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return mod((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "%";
+ }
+ };
+ }
+
+ protected Number plus(Number left, Number right) {
+ switch (numberType(left, right)) {
+ case INTEGER:
+ return new Integer(left.intValue() + right.intValue());
+ case LONG:
+ return new Long(left.longValue() + right.longValue());
+ default:
+ return new Double(left.doubleValue() + right.doubleValue());
+ }
+ }
+
+ protected Number minus(Number left, Number right) {
+ switch (numberType(left, right)) {
+ case INTEGER:
+ return new Integer(left.intValue() - right.intValue());
+ case LONG:
+ return new Long(left.longValue() - right.longValue());
+ default:
+ return new Double(left.doubleValue() - right.doubleValue());
+ }
+ }
+
+ protected Number multiply(Number left, Number right) {
+ switch (numberType(left, right)) {
+ case INTEGER:
+ return new Integer(left.intValue() * right.intValue());
+ case LONG:
+ return new Long(left.longValue() * right.longValue());
+ default:
+ return new Double(left.doubleValue() * right.doubleValue());
+ }
+ }
+
+ protected Number divide(Number left, Number right) {
+ return new Double(left.doubleValue() / right.doubleValue());
+ }
+
+ protected Number mod(Number left, Number right) {
+ return new Double(left.doubleValue() % right.doubleValue());
+ }
+
+ private int numberType(Number left, Number right) {
+ if (isDouble(left) || isDouble(right)) {
+ return DOUBLE;
+ }
+ else if (left instanceof Long || right instanceof Long) {
+ return LONG;
+ }
+ else {
+ return INTEGER;
+ }
+ }
+
+ private boolean isDouble(Number n) {
+ return n instanceof Float || n instanceof Double;
+ }
+
+ protected Number asNumber(Object value) {
+ if (value instanceof Number) {
+ return (Number) value;
+ }
+ else {
+ throw new RuntimeException("Cannot convert value: " + value + " into a number");
+ }
+ }
+
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
+ Object lvalue = left.evaluate(message);
+ if (lvalue == null) {
+ return null;
+ }
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null) {
+ return null;
+ }
+ return evaluate(lvalue, rvalue);
+ }
+
+
+ /**
+ * @param lvalue
+ * @param rvalue
+ * @return
+ */
+ abstract protected Object evaluate(Object lvalue, Object rvalue);
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
new file mode 100644
index 0000000000..4256ab9189
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+
+
+/**
+ * An expression which performs an operation on two expression values.
+ *
+ * @version $Revision$
+ */
+abstract public class BinaryExpression implements Expression {
+ protected Expression left;
+ protected Expression right;
+
+ public BinaryExpression(Expression left, Expression right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ public Expression getLeft() {
+ return left;
+ }
+
+ public Expression getRight() {
+ return right;
+ }
+
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ abstract public String getExpressionSymbol();
+
+ /**
+ * @param expression
+ */
+ public void setRight(Expression expression) {
+ right = expression;
+ }
+
+ /**
+ * @param expression
+ */
+ public void setLeft(Expression expression) {
+ left = expression;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
new file mode 100644
index 0000000000..de71e95049
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+
+
+/**
+ * A BooleanExpression is an expression that always
+ * produces a Boolean result.
+ *
+ * @version $Revision$
+ */
+public interface BooleanExpression extends Expression
+{
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws JMSException
+ */
+ public boolean matches(AMQMessage message) throws AMQException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
new file mode 100644
index 0000000000..07391098ce
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
@@ -0,0 +1,467 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import javax.jms.JMSException;
+
+/**
+ * A filter performing a comparison of two objects
+ *
+ * @version $Revision$
+ */
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression {
+
+ public static BooleanExpression createBetween(Expression value, Expression left, Expression right) {
+ return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
+ }
+
+ public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) {
+ return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
+ }
+
+ static final private HashSet REGEXP_CONTROL_CHARS = new HashSet();
+
+ static {
+ REGEXP_CONTROL_CHARS.add(new Character('.'));
+ REGEXP_CONTROL_CHARS.add(new Character('\\'));
+ REGEXP_CONTROL_CHARS.add(new Character('['));
+ REGEXP_CONTROL_CHARS.add(new Character(']'));
+ REGEXP_CONTROL_CHARS.add(new Character('^'));
+ REGEXP_CONTROL_CHARS.add(new Character('$'));
+ REGEXP_CONTROL_CHARS.add(new Character('?'));
+ REGEXP_CONTROL_CHARS.add(new Character('*'));
+ REGEXP_CONTROL_CHARS.add(new Character('+'));
+ REGEXP_CONTROL_CHARS.add(new Character('{'));
+ REGEXP_CONTROL_CHARS.add(new Character('}'));
+ REGEXP_CONTROL_CHARS.add(new Character('|'));
+ REGEXP_CONTROL_CHARS.add(new Character('('));
+ REGEXP_CONTROL_CHARS.add(new Character(')'));
+ REGEXP_CONTROL_CHARS.add(new Character(':'));
+ REGEXP_CONTROL_CHARS.add(new Character('&'));
+ REGEXP_CONTROL_CHARS.add(new Character('<'));
+ REGEXP_CONTROL_CHARS.add(new Character('>'));
+ REGEXP_CONTROL_CHARS.add(new Character('='));
+ REGEXP_CONTROL_CHARS.add(new Character('!'));
+ }
+
+ static class LikeExpression extends UnaryExpression implements BooleanExpression {
+
+ Pattern likePattern;
+
+ /**
+ * @param right
+ */
+ public LikeExpression(Expression right, String like, int escape) {
+ super(right);
+
+ StringBuffer regexp = new StringBuffer(like.length() * 2);
+ regexp.append("\\A"); // The beginning of the input
+ for (int i = 0; i < like.length(); i++) {
+ char c = like.charAt(i);
+ if (escape == (0xFFFF & c)) {
+ i++;
+ if (i >= like.length()) {
+ // nothing left to escape...
+ break;
+ }
+
+ char t = like.charAt(i);
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & t));
+ }
+ else if (c == '%') {
+ regexp.append(".*?"); // Do a non-greedy match
+ }
+ else if (c == '_') {
+ regexp.append("."); // match one
+ }
+ else if (REGEXP_CONTROL_CHARS.contains(new Character(c))) {
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & c));
+ }
+ else {
+ regexp.append(c);
+ }
+ }
+ regexp.append("\\z"); // The end of the input
+
+ likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL);
+ }
+
+ /**
+ * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol()
+ */
+ public String getExpressionSymbol() {
+ return "LIKE";
+ }
+
+ /**
+ * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
+ */
+ public Object evaluate(AMQMessage message) throws AMQException {
+
+ Object rv = this.getRight().evaluate(message);
+
+ if (rv == null) {
+ return null;
+ }
+
+ if (!(rv instanceof String)) {
+ return Boolean.FALSE;
+ //throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass());
+ }
+
+ return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public boolean matches(AMQMessage message) throws AMQException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+ }
+
+ public static BooleanExpression createLike(Expression left, String right, String escape) {
+ if (escape != null && escape.length() != 1) {
+ throw new RuntimeException("The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape);
+ }
+ int c = -1;
+ if (escape != null) {
+ c = 0xFFFF & escape.charAt(0);
+ }
+
+ return new LikeExpression(left, right, c);
+ }
+
+ public static BooleanExpression createNotLike(Expression left, String right, String escape) {
+ return UnaryExpression.createNOT(createLike(left, right, escape));
+ }
+
+ public static BooleanExpression createInFilter(Expression left, List elements) {
+
+ if( !(left instanceof PropertyExpression) )
+ throw new RuntimeException("Expected a property for In expression, got: "+left);
+ return UnaryExpression.createInExpression((PropertyExpression)left, elements, false);
+
+ }
+
+ public static BooleanExpression createNotInFilter(Expression left, List elements) {
+
+ if( !(left instanceof PropertyExpression) )
+ throw new RuntimeException("Expected a property for In expression, got: "+left);
+ return UnaryExpression.createInExpression((PropertyExpression)left, elements, true);
+
+ }
+
+ public static BooleanExpression createIsNull(Expression left) {
+ return doCreateEqual(left, ConstantExpression.NULL);
+ }
+
+ public static BooleanExpression createIsNotNull(Expression left) {
+ return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL));
+ }
+
+ public static BooleanExpression createNotEqual(Expression left, Expression right) {
+ return UnaryExpression.createNOT(createEqual(left, right));
+ }
+
+ public static BooleanExpression createEqual(Expression left, Expression right) {
+ checkEqualOperand(left);
+ checkEqualOperand(right);
+ checkEqualOperandCompatability(left, right);
+ return doCreateEqual(left, right);
+ }
+
+ private static BooleanExpression doCreateEqual(Expression left, Expression right) {
+ return new ComparisonExpression(left, right) {
+
+ public Object evaluate(AMQMessage message) throws AMQException {
+ Object lv = left.evaluate(message);
+ Object rv = right.evaluate(message);
+
+ // Iff one of the values is null
+ if (lv == null ^ rv == null) {
+ return Boolean.FALSE;
+ }
+ if (lv == rv || lv.equals(rv)) {
+ return Boolean.TRUE;
+ }
+ if( lv instanceof Comparable && rv instanceof Comparable ) {
+ return compare((Comparable)lv, (Comparable)rv);
+ }
+ return Boolean.FALSE;
+ }
+
+ protected boolean asBoolean(int answer) {
+ return answer == 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "=";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThan(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+ protected boolean asBoolean(int answer) {
+ return answer > 0;
+ }
+
+ public String getExpressionSymbol() {
+ return ">";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+ protected boolean asBoolean(int answer) {
+ return answer >= 0;
+ }
+
+ public String getExpressionSymbol() {
+ return ">=";
+ }
+ };
+ }
+
+ public static BooleanExpression createLessThan(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+
+ protected boolean asBoolean(int answer) {
+ return answer < 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "<";
+ }
+
+ };
+ }
+
+ public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+
+ protected boolean asBoolean(int answer) {
+ return answer <= 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "<=";
+ }
+ };
+ }
+
+ /**
+ * Only Numeric expressions can be used in >, >=, < or <= expressions.s
+ *
+ * @param expr
+ */
+ public static void checkLessThanOperand(Expression expr ) {
+ if( expr instanceof ConstantExpression ) {
+ Object value = ((ConstantExpression)expr).getValue();
+ if( value instanceof Number )
+ return;
+
+ // Else it's boolean or a String..
+ throw new RuntimeException("Value '"+expr+"' cannot be compared.");
+ }
+ if( expr instanceof BooleanExpression ) {
+ throw new RuntimeException("Value '"+expr+"' cannot be compared.");
+ }
+ }
+
+ /**
+ * Validates that the expression can be used in == or <> expression.
+ * Cannot not be NULL TRUE or FALSE litterals.
+ *
+ * @param expr
+ */
+ public static void checkEqualOperand(Expression expr ) {
+ if( expr instanceof ConstantExpression ) {
+ Object value = ((ConstantExpression)expr).getValue();
+ if( value == null )
+ throw new RuntimeException("'"+expr+"' cannot be compared.");
+ }
+ }
+
+ /**
+ *
+ * @param left
+ * @param right
+ */
+ private static void checkEqualOperandCompatability(Expression left, Expression right) {
+ if( left instanceof ConstantExpression && right instanceof ConstantExpression ) {
+ if( left instanceof BooleanExpression && !(right instanceof BooleanExpression) )
+ throw new RuntimeException("'"+left+"' cannot be compared with '"+right+"'");
+ }
+ }
+
+
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ComparisonExpression(Expression left, Expression right) {
+ super(left, right);
+ }
+
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
+ Comparable lv = (Comparable) left.evaluate(message);
+ if (lv == null) {
+ return null;
+ }
+ Comparable rv = (Comparable) right.evaluate(message);
+ if (rv == null) {
+ return null;
+ }
+ return compare(lv, rv);
+ }
+
+ protected Boolean compare(Comparable lv, Comparable rv) {
+ Class lc = lv.getClass();
+ Class rc = rv.getClass();
+ // If the the objects are not of the same type,
+ // try to convert up to allow the comparison.
+ if (lc != rc) {
+ if (lc == Byte.class) {
+ if (rc == Short.class) {
+ lv = new Short(((Number) lv).shortValue());
+ }
+ else if (rc == Integer.class) {
+ lv = new Integer(((Number) lv).intValue());
+ }
+ else if (rc == Long.class) {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Short.class) {
+ if (rc == Integer.class) {
+ lv = new Integer(((Number) lv).intValue());
+ }
+ else if (rc == Long.class) {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Integer.class) {
+ if (rc == Long.class) {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Long.class) {
+ if (rc == Integer.class) {
+ rv = new Long(((Number) rv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Float.class) {
+ if (rc == Integer.class) {
+ rv = new Float(((Number) rv).floatValue());
+ }
+ else if (rc == Long.class) {
+ rv = new Float(((Number) rv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Double.class) {
+ if (rc == Integer.class) {
+ rv = new Double(((Number) rv).doubleValue());
+ }
+ else if (rc == Long.class) {
+ rv = new Double(((Number) rv).doubleValue());
+ }
+ else if (rc == Float.class) {
+ rv = new Float(((Number) rv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else
+ return Boolean.FALSE;
+ }
+ return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ protected abstract boolean asBoolean(int answer);
+
+ public boolean matches(AMQMessage message) throws AMQException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
new file mode 100644
index 0000000000..2cd305d4b1
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
@@ -0,0 +1,201 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+
+import java.math.BigDecimal;
+
+import javax.jms.JMSException;
+
+/**
+ * Represents a constant expression
+ *
+ * @version $Revision$
+ */
+public class ConstantExpression implements Expression
+{
+
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
+ {
+ public BooleanConstantExpression(Object value)
+ {
+ super(value);
+ }
+
+ public boolean matches(AMQMessage message) throws AMQException
+ {
+ Object object = evaluate(message);
+ return object != null && object == Boolean.TRUE;
+ }
+ }
+
+ public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
+ public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
+ public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
+
+ private Object value;
+
+ public static ConstantExpression createFromDecimal(String text)
+ {
+
+ // Strip off the 'l' or 'L' if needed.
+ if (text.endsWith("l") || text.endsWith("L"))
+ {
+ text = text.substring(0, text.length() - 1);
+ }
+
+ Number value;
+ try
+ {
+ value = new Long(text);
+ }
+ catch (NumberFormatException e)
+ {
+ // The number may be too big to fit in a long.
+ value = new BigDecimal(text);
+ }
+
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+ {
+ value = new Integer(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromHex(String text)
+ {
+ Number value = new Long(Long.parseLong(text.substring(2), 16));
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+ {
+ value = new Integer(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromOctal(String text)
+ {
+ Number value = new Long(Long.parseLong(text, 8));
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+ {
+ value = new Integer(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFloat(String text)
+ {
+ Number value = new Double(text);
+ return new ConstantExpression(value);
+ }
+
+ public ConstantExpression(Object value)
+ {
+ this.value = value;
+ }
+
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
+ return value;
+ }
+
+ public Object getValue()
+ {
+ return value;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ if (value == null)
+ {
+ return "NULL";
+ }
+ if (value instanceof Boolean)
+ {
+ return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
+ }
+ if (value instanceof String)
+ {
+ return encodeString((String) value);
+ }
+ return value.toString();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+
+ if (o == null || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+
+ /**
+ * Encodes the value of string so that it looks like it would look like
+ * when it was provided in a selector.
+ *
+ * @param s
+ * @return
+ */
+ public static String encodeString(String s)
+ {
+ StringBuffer b = new StringBuffer();
+ b.append('\'');
+ for (int i = 0; i < s.length(); i++)
+ {
+ char c = s.charAt(i);
+ if (c == '\'')
+ {
+ b.append(c);
+ }
+ b.append(c);
+ }
+ b.append('\'');
+ return b.toString();
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
new file mode 100644
index 0000000000..3b5debd3ee
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+
+
+/**
+ * Represents an expression
+ *
+ * @version $Revision$
+ */
+public interface Expression
+{
+
+ /**
+ * @return the value of this expression
+ */
+ public Object evaluate(AMQMessage message) throws AMQException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
new file mode 100644
index 0000000000..c82de9fa15
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+public interface FilterManager
+{
+ void add(MessageFilter filter);
+
+ void remove(MessageFilter filter);
+
+ boolean allAllow(AMQMessage msg);
+
+ boolean hasFilters();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
new file mode 100644
index 0000000000..49f99132ef
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+
+public class FilterManagerFactory
+{
+ //private final static Logger _logger = LoggerFactory.getLogger(FilterManagerFactory.class);
+ private final static org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(FilterManagerFactory.class);
+
+ //fixme move to a common class so it can be refered to from client code.
+
+ public static FilterManager createManager(FieldTable filters) throws AMQException
+ {
+ FilterManager manager = null;
+
+ if (filters != null)
+ {
+
+ manager = new SimpleFilterManager();
+
+ Iterator it = filters.keySet().iterator();
+ _logger.info("Processing filters:");
+ while (it.hasNext())
+ {
+ String key = (String) it.next();
+ _logger.info("filter:" + key);
+ if (key.equals(AMQPFilterTypes.JMS_SELECTOR.getValue()))
+ {
+ String selector = (String) filters.get(key);
+
+ if (selector != null && !selector.equals(""))
+ {
+ manager.add(new JMSSelectorFilter(selector));
+ }
+ }
+
+ if (key.equals(AMQPFilterTypes.NO_CONSUME.getValue()))
+ {
+ manager.add(new NoConsumerFilter());
+ }
+
+ }
+
+ //If we added no filters don't bear the overhead of having an filter manager
+ if (!manager.hasFilters())
+ {
+ manager = null;
+ }
+ }
+ else
+ {
+ _logger.info("No Filters found.");
+ }
+
+
+ return manager;
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
new file mode 100644
index 0000000000..5f505fbeba
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.filter.jms.selector.SelectorParser;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.log4j.Logger;
+
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+public class JMSSelectorFilter implements MessageFilter
+{
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
+
+ private String _selector;
+ private BooleanExpression _matcher;
+
+ public JMSSelectorFilter(String selector) throws AMQException
+ {
+ _selector = selector;
+ _logger.info("Created JMSSelectorFilter with selector:" + _selector);
+
+
+ try
+ {
+ _matcher = new SelectorParser().parse(selector);
+ }
+ catch (InvalidSelectorException e)
+ {
+ // fixme
+ // Is this the correct way of throwing exception
+ throw new AMQInvalidSelectorException(e.getMessage());
+ }
+
+ }
+
+ public boolean matches(AMQMessage message)
+ {
+ try
+ {
+ boolean match = _matcher.matches(message);
+ _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
+ return match;
+ }
+ catch (AMQException e)
+ {
+ //fixme this needs to be sorted.. it shouldn't happen
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return false;
+ }
+
+ public String getSelector()
+ {
+ return _selector;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
new file mode 100644
index 0000000000..e6ad98cb8b
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
@@ -0,0 +1,96 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+
+/**
+ * A filter performing a comparison of two objects
+ *
+ * @version $Revision$
+ */
+public abstract class LogicExpression extends BinaryExpression implements BooleanExpression {
+
+ public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) {
+ return new LogicExpression(lvalue, rvalue) {
+
+ public Object evaluate(AMQMessage message) throws AMQException {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+ // Can we do an OR shortcut??
+ if (lv !=null && lv.booleanValue()) {
+ return Boolean.TRUE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+ return rv==null ? null : rv;
+ }
+
+ public String getExpressionSymbol() {
+ return "OR";
+ }
+ };
+ }
+
+ public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) {
+ return new LogicExpression(lvalue, rvalue) {
+
+ public Object evaluate(AMQMessage message) throws AMQException {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+
+ // Can we do an AND shortcut??
+ if (lv == null)
+ return null;
+ if (!lv.booleanValue()) {
+ return Boolean.FALSE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+ return rv == null ? null : rv;
+ }
+
+ public String getExpressionSymbol() {
+ return "AND";
+ }
+ };
+ }
+
+ /**
+ * @param left
+ * @param right
+ */
+ public LogicExpression(BooleanExpression left, BooleanExpression right) {
+ super(left, right);
+ }
+
+ abstract public Object evaluate(AMQMessage message) throws AMQException;
+
+ public boolean matches(AMQMessage message) throws AMQException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
new file mode 100644
index 0000000000..b8ca75d209
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+import javax.jms.JMSException;
+
+public interface MessageFilter
+{
+ boolean matches(AMQMessage message) throws JMSException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
new file mode 100644
index 0000000000..5c58b73ea3
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+public class NoConsumerFilter implements MessageFilter
+{
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(NoConsumerFilter.class);
+
+
+ public NoConsumerFilter() throws AMQException
+ {
+ _logger.info("Created NoConsumerFilter");
+ }
+
+ public boolean matches(AMQMessage message)
+ {
+ return true;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
new file mode 100644
index 0000000000..7d6a98df84
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -0,0 +1,305 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+
+//import org.apache.activemq.command.ActiveMQDestination;
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.command.TransactionId;
+//import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.log4j.Logger;
+
+/**
+ * Represents a property expression
+ *
+ * @version $Revision$
+ */
+public class PropertyExpression implements Expression
+{
+
+ interface SubExpression
+ {
+ public Object evaluate(AMQMessage message) throws AMQException;
+ }
+
+ interface JMSExpression
+ {
+ public abstract Object evaluate(JMSMessage message);
+ }
+
+ static class SubJMSExpression implements SubExpression
+ {
+ JMSExpression _expression;
+
+ SubJMSExpression(JMSExpression expression)
+ {
+ _expression = expression;
+ }
+
+
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
+ JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE);
+ if (msg != null)
+ {
+ return _expression.evaluate(msg);
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
+
+
+ static final private HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap();
+
+ static
+ {
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSDestination();
+ }
+ }
+ ));
+//
+// public Object evaluate(AMQMessage message)
+// {
+// //fixme
+//
+//
+//// AMQDestination dest = message.getOriginalDestination();
+//// if (dest == null)
+//// {
+//// dest = message.getDestination();
+//// }
+//// if (dest == null)
+//// {
+//// return null;
+//// }
+//// return dest.toString();
+// return "";
+// }
+// });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSReplyTo();
+ }
+ })
+ );
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSType();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ try
+ {
+ Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ _logger.info("JMSDeliveryMode is :" + mode);
+ return mode;
+ }
+ catch (AMQException e)
+ {
+ //shouldn't happen
+ }
+
+ return DeliveryMode.NON_PERSISTENT;
+ }
+ }));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSPriority();
+ }
+ }
+ ));
+
+
+ JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getAMQMessage().getMessageId();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSTimestamp();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSCorrelationID();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSExpiration();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getAMQMessage().isRedelivered();
+ }
+ }
+ ));
+
+ }
+
+ private final String name;
+ private final SubExpression jmsPropertyExpression;
+
+ public PropertyExpression(String name)
+ {
+ this.name = name;
+ jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name);
+ }
+
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
+// try
+// {
+// if (message.isDropped())
+// {
+// return null;
+// }
+
+ if (jmsPropertyExpression != null)
+ {
+ return jmsPropertyExpression.evaluate(message);
+ }
+// try
+ else
+ {
+
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+
+ _logger.info("Looking up property:" + name);
+ _logger.info("Properties are:" + _properties.getHeaders().keySet());
+
+ return _properties.getHeaders().get(name);
+ }
+// catch (IOException ioe)
+// {
+// JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage());
+// exception.initCause(ioe);
+// throw exception;
+// }
+// }
+// catch (IOException e)
+// {
+// JMSException exception = new JMSException(e.getMessage());
+// exception.initCause(e);
+// throw exception;
+// }
+
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ return name;
+ }
+
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return name.hashCode();
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+
+ if (o == null || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+ return name.equals(((PropertyExpression) o).name);
+
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
new file mode 100644
index 0000000000..dc2c2c0e6c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class SimpleFilterManager implements FilterManager
+{
+ private final Logger _logger = LoggerFactory.getLogger(SimpleFilterManager.class);
+
+ private final ConcurrentLinkedQueue<MessageFilter> _filters;
+
+ public SimpleFilterManager()
+ {
+ _logger.debug("Creating SimpleFilterManager");
+ _filters = new ConcurrentLinkedQueue<MessageFilter>();
+ }
+
+ public void add(MessageFilter filter)
+ {
+ _filters.add(filter);
+ }
+
+ public void remove(MessageFilter filter)
+ {
+ _filters.remove(filter);
+ }
+
+ public boolean allAllow(AMQMessage msg)
+ {
+ for (MessageFilter filter : _filters)
+ {
+ try
+ {
+ if (!filter.matches(msg))
+ {
+ return false;
+ }
+ }
+ catch (JMSException e)
+ {
+ //fixme
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean hasFilters()
+ {
+ return !_filters.isEmpty();
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
new file mode 100644
index 0000000000..abc56f04d0
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
@@ -0,0 +1,265 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+/**
+ * An expression which performs an operation on two expression values
+ *
+ * @version $Revision$
+ */
+public abstract class UnaryExpression implements Expression {
+
+ private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
+ protected Expression right;
+
+ public static Expression createNegate(Expression left) {
+ return new UnaryExpression(left) {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null) {
+ return null;
+ }
+ if (rvalue instanceof Number) {
+ return negate((Number) rvalue);
+ }
+ return null;
+ }
+
+ public String getExpressionSymbol() {
+ return "-";
+ }
+ };
+ }
+
+ public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) {
+
+ // Use a HashSet if there are many elements.
+ Collection t;
+ if( elements.size()==0 )
+ t=null;
+ else if( elements.size() < 5 )
+ t = elements;
+ else {
+ t = new HashSet(elements);
+ }
+ final Collection inList = t;
+
+ return new BooleanUnaryExpression(right) {
+ public Object evaluate(AMQMessage message) throws AMQException {
+
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null) {
+ return null;
+ }
+ if( rvalue.getClass()!=String.class )
+ return null;
+
+ if( (inList!=null && inList.contains(rvalue)) ^ not ) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+
+ }
+
+ public String toString() {
+ StringBuffer answer = new StringBuffer();
+ answer.append(right);
+ answer.append(" ");
+ answer.append(getExpressionSymbol());
+ answer.append(" ( ");
+
+ int count=0;
+ for (Iterator i = inList.iterator(); i.hasNext();) {
+ Object o = (Object) i.next();
+ if( count!=0 ) {
+ answer.append(", ");
+ }
+ answer.append(o);
+ count++;
+ }
+
+ answer.append(" )");
+ return answer.toString();
+ }
+
+ public String getExpressionSymbol() {
+ if( not )
+ return "NOT IN";
+ else
+ return "IN";
+ }
+ };
+ }
+
+ abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression {
+ public BooleanUnaryExpression(Expression left) {
+ super(left);
+ }
+
+ public boolean matches(AMQMessage message) throws AMQException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+ };
+
+
+ public static BooleanExpression createNOT(BooleanExpression left) {
+ return new BooleanUnaryExpression(left) {
+ public Object evaluate(AMQMessage message) throws AMQException {
+ Boolean lvalue = (Boolean) right.evaluate(message);
+ if (lvalue == null) {
+ return null;
+ }
+ return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
+ }
+
+ public String getExpressionSymbol() {
+ return "NOT";
+ }
+ };
+ }
+
+ public static BooleanExpression createXPath(final String xpath) {
+ return new XPathExpression(xpath);
+ }
+
+ public static BooleanExpression createXQuery(final String xpath) {
+ return new XQueryExpression(xpath);
+ }
+
+ public static BooleanExpression createBooleanCast(Expression left) {
+ return new BooleanUnaryExpression(left) {
+ public Object evaluate(AMQMessage message) throws AMQException {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ return null;
+ if (!rvalue.getClass().equals(Boolean.class))
+ return Boolean.FALSE;
+ return ((Boolean)rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public String toString() {
+ return right.toString();
+ }
+
+ public String getExpressionSymbol() {
+ return "";
+ }
+ };
+ }
+
+ private static Number negate(Number left) {
+ Class clazz = left.getClass();
+ if (clazz == Integer.class) {
+ return new Integer(-left.intValue());
+ }
+ else if (clazz == Long.class) {
+ return new Long(-left.longValue());
+ }
+ else if (clazz == Float.class) {
+ return new Float(-left.floatValue());
+ }
+ else if (clazz == Double.class) {
+ return new Double(-left.doubleValue());
+ }
+ else if (clazz == BigDecimal.class) {
+ // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the
+ // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it
+ // as a Big decimal. But it gets Negated right away.. to here we try to covert it back
+ // to a Long.
+ BigDecimal bd = (BigDecimal)left;
+ bd = bd.negate();
+
+ if( BD_LONG_MIN_VALUE.compareTo(bd)==0 ) {
+ return new Long(Long.MIN_VALUE);
+ }
+ return bd;
+ }
+ else {
+ throw new RuntimeException("Don't know how to negate: "+left);
+ }
+ }
+
+ public UnaryExpression(Expression left) {
+ this.right = left;
+ }
+
+ public Expression getRight() {
+ return right;
+ }
+
+ public void setRight(Expression expression) {
+ right = expression;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return "(" + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ abstract public String getExpressionSymbol();
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
new file mode 100644
index 0000000000..85402e0781
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import javax.jms.JMSException;
+
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+
+/**
+ * Used to evaluate an XPath Expression in a JMS selector.
+ */
+public final class XPathExpression implements BooleanExpression {
+
+ private static final Log log = LogFactory.getLog(XPathExpression.class);
+ private static final String EVALUATOR_SYSTEM_PROPERTY = "org.apache.qpid.server.filter.XPathEvaluatorClassName";
+ private static final String DEFAULT_EVALUATOR_CLASS_NAME=XalanXPathEvaluator.class.getName();
+
+ private static final Constructor EVALUATOR_CONSTRUCTOR;
+
+ static {
+ String cn = System.getProperty(EVALUATOR_SYSTEM_PROPERTY, DEFAULT_EVALUATOR_CLASS_NAME);
+ Constructor m = null;
+ try {
+ try {
+ m = getXPathEvaluatorConstructor(cn);
+ } catch (Throwable e) {
+ log.warn("Invalid "+XPathEvaluator.class.getName()+" implementation: "+cn+", reason: "+e,e);
+ cn = DEFAULT_EVALUATOR_CLASS_NAME;
+ try {
+ m = getXPathEvaluatorConstructor(cn);
+ } catch (Throwable e2) {
+ log.error("Default XPath evaluator could not be loaded",e);
+ }
+ }
+ } finally {
+ EVALUATOR_CONSTRUCTOR = m;
+ }
+ }
+
+ private static Constructor getXPathEvaluatorConstructor(String cn) throws ClassNotFoundException, SecurityException, NoSuchMethodException {
+ Class c = XPathExpression.class.getClassLoader().loadClass(cn);
+ if( !XPathEvaluator.class.isAssignableFrom(c) ) {
+ throw new ClassCastException(""+c+" is not an instance of "+XPathEvaluator.class);
+ }
+ return c.getConstructor(new Class[]{String.class});
+ }
+
+ private final String xpath;
+ private final XPathEvaluator evaluator;
+
+ static public interface XPathEvaluator {
+ public boolean evaluate(AMQMessage message) throws AMQException;
+ }
+
+ XPathExpression(String xpath) {
+ this.xpath = xpath;
+ this.evaluator = createEvaluator(xpath);
+ }
+
+ private XPathEvaluator createEvaluator(String xpath2) {
+ try {
+ return (XPathEvaluator)EVALUATOR_CONSTRUCTOR.newInstance(new Object[]{xpath});
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if( cause instanceof RuntimeException ) {
+ throw (RuntimeException)cause;
+ }
+ throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e);
+ } catch (Throwable e) {
+ throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e);
+ }
+ }
+
+ public Object evaluate(AMQMessage message) throws AMQException {
+// try {
+//FIXME this is flow to disk work
+// if( message.isDropped() )
+// return null;
+ return evaluator.evaluate(message) ? Boolean.TRUE : Boolean.FALSE;
+// } catch (IOException e) {
+//
+// JMSException exception = new JMSException(e.getMessage());
+// exception.initCause(e);
+// throw exception;
+//
+// }
+
+ }
+
+ public String toString() {
+ return "XPATH "+ConstantExpression.encodeString(xpath);
+ }
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws JMSException
+ */
+ public boolean matches(AMQMessage message) throws AMQException
+ {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
new file mode 100644
index 0000000000..da8a61650a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+/**
+ * Used to evaluate an XQuery Expression in a JMS selector.
+ */
+public final class XQueryExpression implements BooleanExpression {
+ private final String xpath;
+
+ XQueryExpression(String xpath) {
+ super();
+ this.xpath = xpath;
+ }
+
+ public Object evaluate(AMQMessage message) throws AMQException {
+ return Boolean.FALSE;
+ }
+
+ public String toString() {
+ return "XQUERY "+ConstantExpression.encodeString(xpath);
+ }
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws JMSException
+ */
+ public boolean matches(AMQMessage message) throws AMQException
+ {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
new file mode 100644
index 0000000000..f74e0cedec
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
@@ -0,0 +1,109 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.StringReader;
+import java.io.ByteArrayInputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.xpath.CachedXPathAPI;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+import org.w3c.dom.Document;
+import org.w3c.dom.traversal.NodeIterator;
+import org.xml.sax.InputSource;
+
+public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
+
+ private final String xpath;
+
+ public XalanXPathEvaluator(String xpath) {
+ this.xpath = xpath;
+ }
+
+ public boolean evaluate(AMQMessage m) throws AMQException
+ {
+ try
+ {
+
+ if( m instanceof TextMessage ) {
+ String text = ((TextMessage)m).getText();
+ return evaluate(text);
+ } else if ( m instanceof BytesMessage ) {
+ BytesMessage bm = (BytesMessage) m;
+ byte data[] = new byte[(int) bm.getBodyLength()];
+ bm.readBytes(data);
+ return evaluate(data);
+ }
+ return false;
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException("Error evaluting message: " + e, e);
+ }
+ }
+
+ private boolean evaluate(byte[] data) {
+ try {
+
+ InputSource inputSource = new InputSource(new ByteArrayInputStream(data));
+
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ DocumentBuilder dbuilder = factory.newDocumentBuilder();
+ Document doc = dbuilder.parse(inputSource);
+
+ CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
+ NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
+ return iterator.nextNode()!=null;
+
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+
+ private boolean evaluate(String text) {
+ try {
+ InputSource inputSource = new InputSource(new StringReader(text));
+
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ DocumentBuilder dbuilder = factory.newDocumentBuilder();
+ Document doc = dbuilder.parse(inputSource);
+
+ // We should associated the cachedXPathAPI object with the message being evaluated
+ // since that should speedup subsequent xpath expressions.
+ CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
+ NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
+ return iterator.nextNode()!=null;
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 8b3ced9811..164094ac58 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -21,10 +21,12 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.log4j.Logger;
@@ -74,8 +77,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck);
- if(!body.nowait)
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+ body.arguments, body.noLocal);
+ if (!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
}
@@ -83,10 +87,19 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
//now allow queue to start async processing of any backlog of messages
queue.deliverAsync();
}
+ catch (AMQInvalidSelectorException ise)
+ {
+ _log.info("Closing connection due to invalid selector");
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+ ise.getMessage(), BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
+ }
catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+ BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 00ae547683..79b2e11bca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -78,12 +78,19 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ //save clientProperties
+ if (protocolSession.getClientProperties() == null)
+ {
+ protocolSession.setClientProperties(body.clientProperties);
+ }
+
switch (authResult.status)
{
case ERROR:
throw new AMQException("Authentication failed");
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
+
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
HeartbeatConfig.getInstance().getDelay());
@@ -122,7 +129,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
static int getConfiguredFrameSize()
{
final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
- final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
+ final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
_logger.info("Framesize set to " + framesize);
return framesize;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
new file mode 100644
index 0000000000..aba3b88a59
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.message;
+
+public interface MessageDecorator
+{
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
new file mode 100644
index 0000000000..376f88cbf1
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.message.jms;
+
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import javax.jms.MessageNotWriteableException;
+import java.util.Enumeration;
+
+public class JMSMessage implements MessageDecorator
+{
+
+ private AMQMessage _message;
+ private BasicContentHeaderProperties _properties;
+
+ public JMSMessage(AMQMessage message) throws AMQException
+ {
+ _message = message;
+ ContentHeaderBody contentHeader = message.getContentHeaderBody();
+ _properties = (BasicContentHeaderProperties) contentHeader.properties;
+ }
+
+ protected void checkWriteable() throws MessageNotWriteableException
+ {
+ //The broker should not modify a message.
+// if (_readableMessage)
+ {
+ throw new MessageNotWriteableException("The broker should not modify a message.");
+ }
+ }
+
+
+ public String getJMSMessageID()
+ {
+ return _properties.getMessageId();
+ }
+
+ public void setJMSMessageID(String string) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setMessageId(string);
+ }
+
+ public long getJMSTimestamp()
+ {
+ return _properties.getTimestamp();
+ }
+
+ public void setJMSTimestamp(long l) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setTimestamp(l);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes()
+ {
+ return _properties.getCorrelationId().getBytes();
+ }
+
+// public void setJMSCorrelationIDAsBytes(byte[] bytes)
+// {
+// }
+
+ public void setJMSCorrelationID(String string) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setCorrelationId(string);
+ }
+
+ public String getJMSCorrelationID()
+ {
+ return _properties.getCorrelationId();
+ }
+
+ public String getJMSReplyTo()
+ {
+ return _properties.getReplyTo();
+ }
+
+ public void setJMSReplyTo(Destination destination) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setReplyTo(destination.toString());
+ }
+
+ public String getJMSDestination()
+ {
+ //fixme should be a deestination
+ return "";
+ }
+
+ public void setJMSDestination(Destination destination) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ //_properties.setDestination(destination.toString());
+ }
+
+ public int getJMSDeliveryMode()
+ {
+ return _properties.getDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(byte i) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setDeliveryMode(i);
+ }
+
+ public boolean getJMSRedelivered()
+ {
+ return _message.isRedelivered();
+ }
+
+ public void setJMSRedelivered(boolean b) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _message.setRedelivered(b);
+ }
+
+ public String getJMSType()
+ {
+ return _properties.getType();
+ }
+
+ public void setJMSType(String string) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setType(string);
+ }
+
+ public long getJMSExpiration()
+ {
+ return _properties.getExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setExpiration(l);
+ }
+
+ public int getJMSPriority()
+ {
+ return _properties.getPriority();
+ }
+
+ public void setJMSPriority(byte i) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setPriority(i);
+ }
+
+ public void clearProperties() throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().clear();
+ }
+
+ public boolean propertyExists(String string)
+ {
+ return _properties.getJMSHeaders().propertyExists(string);
+ }
+
+ public boolean getBooleanProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getBoolean(string);
+ }
+
+ public byte getByteProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getByte(string);
+ }
+
+ public short getShortProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getShort(string);
+ }
+
+ public int getIntProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getInteger(string);
+ }
+
+ public long getLongProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getLong(string);
+ }
+
+ public float getFloatProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getFloat(string);
+ }
+
+ public double getDoubleProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getDouble(string);
+ }
+
+ public String getStringProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getString(string);
+ }
+
+ public Object getObjectProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getObject(string);
+ }
+
+ public Enumeration getPropertyNames()
+ {
+ return _properties.getJMSHeaders().getPropertyNames();
+ }
+
+ public void setBooleanProperty(String string, boolean b) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setBoolean(string, b);
+ }
+
+ public void setByteProperty(String string, byte b) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setByte(string, b);
+ }
+
+ public void setShortProperty(String string, short i) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setShort(string, i);
+ }
+
+ public void setIntProperty(String string, int i) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setInteger(string, i);
+ }
+
+ public void setLongProperty(String string, long l) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setLong(string, l);
+ }
+
+ public void setFloatProperty(String string, float v) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setFloat(string, v);
+ }
+
+ public void setDoubleProperty(String string, double v) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setDouble(string, v);
+ }
+
+ public void setStringProperty(String string, String string1) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setString(string, string1);
+ }
+
+ public void setObjectProperty(String string, Object object) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setObject(string, object);
+ }
+
+ public void acknowledge() throws MessageNotWriteableException
+ {
+ checkWriteable();
+ }
+
+ public void clearBody() throws MessageNotWriteableException
+ {
+ checkWriteable();
+ }
+
+ public AMQMessage getAMQMessage()
+ {
+ return _message;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 1832f01b7a..8f10a06fe4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -26,9 +26,19 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
-import org.apache.qpid.framing.*;
+
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.management.Managable;
@@ -84,6 +94,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/* AMQP Version for this session */
private byte _major;
private byte _minor;
+ private FieldTable _clientProperties;
public ManagedObject getManagedObject()
{
@@ -119,7 +130,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return new AMQProtocolSessionMBean(this);
}
- catch(JMException ex)
+ catch (JMException ex)
{
_logger.error("AMQProtocolSession MBean creation has failed ", ex);
throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
@@ -144,7 +155,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
ProtocolInitiation pi = (ProtocolInitiation) message;
// this ensures the codec never checks for a PI message again
- ((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
pi.checkVersion(this); // Fails if not correct
@@ -153,7 +164,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_minor = pi.protocolMinor;
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
- AMQFrame response = ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor, pi.protocolMinor, null,
+ AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
mechanisms.getBytes(), locales.getBytes());
_minaProtocolSession.write(response);
}
@@ -195,7 +206,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_logger.debug("Method frame received: " + frame);
}
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
- (AMQMethodBody)frame.bodyFrame);
+ (AMQMethodBody) frame.bodyFrame);
try
{
boolean wasAnyoneInterested = false;
@@ -250,7 +261,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content header frame received: " + frame);
}
- getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame);
+ getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
}
private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -294,8 +305,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _channelMap.get(channelId);
}
- public void addChannel(AMQChannel channel)
+ public void addChannel(AMQChannel channel) throws AMQException
{
+ if (_closed)
+ {
+ throw new AMQException("Session is closed");
+ }
+
_channelMap.put(channel.getChannelId(), channel);
checkForNotification();
}
@@ -339,6 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* Close a specific channel. This will remove any resources used by the channel, including:
* <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
* </ul>
+ *
* @param channelId id of the channel to close
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
@@ -365,6 +382,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* In our current implementation this is used by the clustering code.
+ *
* @param channelId
*/
public void removeChannel(int channelId)
@@ -374,11 +392,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* Initialise heartbeats on the session.
+ *
* @param delay delay in seconds (not ms)
*/
public void initHeartbeats(int delay)
{
- if(delay > 0)
+ if (delay > 0)
{
_minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
_minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay));
@@ -388,6 +407,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* Closes all channels that were opened by this protocol session. This frees up all resources
* used by the channel.
+ *
* @throws AMQException if an error occurs while closing any channel
*/
private void closeAllChannels() throws AMQException
@@ -396,6 +416,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
channel.close(this);
}
+ _channelMap.clear();
}
/**
@@ -404,7 +425,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
*/
public void closeSession() throws AMQException
{
- if(!_closed)
+ if (!_closed)
{
_closed = true;
closeAllChannels();
@@ -446,11 +467,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
{
- return ((InetSocketAddress)address).getHostName();
+ return ((InetSocketAddress) address).getHostName();
}
else if (address instanceof VmPipeAddress)
{
- return "vmpipe:" + ((VmPipeAddress)address).getPort();
+ return "vmpipe:" + ((VmPipeAddress) address).getPort();
}
else
{
@@ -468,6 +489,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_saslServer = saslServer;
}
+ public FieldTable getClientProperties()
+ {
+ return _clientProperties;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
+ {
+ _clientProperties = clientProperties;
+ }
+
/**
* Convenience methods for managing AMQP version.
* NOTE: Both major and minor will be set to 0 prior to protocol initiation.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index acaf6b0d9b..a75627d240 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
@@ -69,7 +70,7 @@ public interface AMQProtocolSession
* @param channel the channel to associate with this session. It is an error to
* associate the same channel with more than one session but this is not validated.
*/
- void addChannel(AMQChannel channel);
+ void addChannel(AMQChannel channel) throws AMQException;
/**
* Close a specific channel. This will remove any resources used by the channel, including:
@@ -122,4 +123,9 @@ public interface AMQProtocolSession
* @param saslServer
*/
void setSaslServer(SaslServer saslServer);
+
+
+ FieldTable getClientProperties();
+
+ void setClientProperties(FieldTable clientProperties);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index a47d462810..d57f9b9be1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -18,6 +18,9 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -183,33 +186,17 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
}
/**
- * @see AMQMinaProtocolSession#closeChannel(int)
- */
- public void closeChannel(int id) throws JMException
- {
- try
- {
- AMQChannel channel = _session.getChannel(id);
- if (channel == null)
- {
- throw new JMException("The channel (channel Id = " + id + ") does not exist");
- }
-
- _session.closeChannel(id);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
* closes the connection. The administrator can use this management operation to close connection to free up
* resources.
* @throws JMException
*/
public void closeConnection() throws JMException
{
+
+ final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(),
+ "Broker Management Console has closing the connection.", 0, 0);
+ _session.writeFrame(response);
+
try
{
_session.closeSession();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
index 2f3102b048..1a7b7e9e96 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
@@ -114,15 +114,6 @@ public interface ManagedConnection
void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
/**
- * Unsubscribes the consumers and unregisters the channel from managed objects.
- */
- @MBeanOperation(name="closeChannel",
- description="Closes the channel with given channel Id and connected consumers will be unsubscribed",
- impact= MBeanOperationInfo.ACTION)
- void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
- throws Exception;
-
- /**
* Closes all the related channels and unregisters this connection from managed objects.
*/
@MBeanOperation(name="closeConnection",
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 8f6377d80d..b7dcffe3cb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -26,10 +26,14 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.log4j.Logger;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Combines the information that make up a deliverable message into a more manageable form.
@@ -38,6 +42,8 @@ public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
+ public static final String JMS_MESSAGE = "jms.message";
+
/**
* Used in clustering
*/
@@ -64,6 +70,9 @@ public class AMQMessage
*/
private boolean _deliveredToConsumer;
+ private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
+ private AtomicBoolean _taken;
+
private TransientMessageData _transientMessageData = new TransientMessageData();
/**
@@ -141,6 +150,8 @@ public class AMQMessage
_messageId = messageId;
_txnContext = txnContext;
_transientMessageData.setPublishBody(publishBody);
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
+ _taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
_log.debug("Message created with id " + messageId);
@@ -358,6 +369,60 @@ public class AMQMessage
return _publisher;
}
+ /**
+ * Called selectors to determin if the message has already been sent
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return _deliveredToConsumer;
+ }
+
+
+ public MessageDecorator getDecodedMessage(String type) throws AMQException
+ {
+ MessageDecorator msgtype = null;
+
+ if (_decodedMessages != null)
+ {
+ msgtype = _decodedMessages.get(type);
+
+ if (msgtype == null)
+ {
+ msgtype = decorateMessage(type);
+ }
+ }
+
+ return msgtype;
+ }
+
+ private MessageDecorator decorateMessage(String type) throws AMQException
+ {
+ MessageDecorator msgdec = null;
+
+ if (type.equals(JMS_MESSAGE))
+ {
+ msgdec = new JMSMessage(this);
+ }
+
+ if (msgdec != null)
+ {
+ _decodedMessages.put(type, msgdec);
+ }
+
+ return msgdec;
+ }
+
+ public boolean taken()
+ {
+ return _taken.getAndSet(true);
+ }
+
+ public void release()
+ {
+ _taken.set(false);
+ }
+
public boolean checkToken(Object token)
{
if (_tokens.contains(token))
@@ -507,8 +572,7 @@ public class AMQMessage
}
//
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ // Now start writing out the other content bodies
//
while (bodyFrameIterator.hasNext())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 5f6d4c2939..7ab48598c4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -96,7 +96,7 @@ public class AMQQueue implements Managable, Comparable
* max allowed number of messages on a queue.
*/
private Integer _maxMessageCount = 10000;
-
+
/**
* max queue depth(KB) for the queue
*/
@@ -188,16 +188,29 @@ public class AMQQueue implements Managable, Comparable
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
- //fixme - Pick one.
- if (Boolean.getBoolean("concurrentdeliverymanager"))
+ //fixme - Make this configurable via the broker config.xml
+ if (System.getProperties().getProperty("deliverymanager") != null)
{
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
+ {
+ _logger.info("Using ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+ }
+ else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
+ {
+ _logger.info("Using ConcurrentDeliveryManager");
+ _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ }
+ else
+ {
+ _logger.info("Using SynchronizedDeliveryManager");
+ _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+ }
}
else
{
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+ _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
}
@@ -349,12 +362,26 @@ public class AMQQueue implements Managable, Comparable
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ {
+ registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
+ }
+
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+
+ if(subscription.hasFilters())
+ {
+ if (_deliveryMgr.hasQueuedMessages())
+ {
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
+ }
+ }
+
_subscribers.addSubscriber(subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
index 4f6173fa2a..dda074aca7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
@@ -195,6 +195,11 @@ public class ConcurrentDeliveryManager implements DeliveryManager
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ //no-op . This DM has no PreDeliveryQueues
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -309,7 +314,6 @@ public class ConcurrentDeliveryManager implements DeliveryManager
else
{
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
finally
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
new file mode 100644
index 0000000000..23e2754eb2
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -0,0 +1,388 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.configuration.Configurator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ */
+public class ConcurrentSelectorDeliveryManager implements DeliveryManager
+{
+ private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
+
+ @Configured(path = "advanced.compressBufferOnQueue",
+ defaultValue = "false")
+ public boolean compressBufferOnQueue;
+ /**
+ * Holds any queued messages
+ */
+ private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ //private int _messageCount;
+ /**
+ * Ensures that only one asynchronous task is running for this manager at
+ * any time.
+ */
+ private final AtomicBoolean _processing = new AtomicBoolean();
+ /**
+ * The subscriptions on the queue to whom messages are delivered
+ */
+ private final SubscriptionManager _subscriptions;
+
+ /**
+ * A reference to the queue we are delivering messages for. We need this to be able
+ * to pass the code that handles acknowledgements a handle on the queue.
+ */
+ private final AMQQueue _queue;
+
+
+ /**
+ * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
+ * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
+ * via the async thread.
+ * <p/>
+ * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+ */
+ private ReentrantLock _lock = new ReentrantLock();
+
+
+ ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+ {
+
+ //Set values from configuration
+ Configurator.configure(this);
+
+ if (compressBufferOnQueue)
+ {
+ _log.warn("Compressing Buffers on queue.");
+ }
+
+ _subscriptions = subscriptions;
+ _queue = queue;
+ }
+
+
+ private boolean addMessageToQueue(AMQMessage msg)
+ {
+ // Shrink the ContentBodies to their actual size to save memory.
+ if (compressBufferOnQueue)
+ {
+ Iterator<ContentBody> it = msg.getContentBodyIterator();
+ while (it.hasNext())
+ {
+ ContentBody cb = it.next();
+ cb.reduceBufferToFit();
+ }
+ }
+
+ _messages.offer(msg);
+
+ return true;
+ }
+
+
+ public boolean hasQueuedMessages()
+ {
+ _lock.lock();
+ try
+ {
+ return !_messages.isEmpty();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public int getQueueMessageCount()
+ {
+ return getMessageCount();
+ }
+
+ /**
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
+ * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+ *
+ * @return int the number of messages in the delivery queue.
+ */
+ private int getMessageCount()
+ {
+ return _messages.size();
+ }
+
+
+ public synchronized List<AMQMessage> getMessages()
+ {
+ return new ArrayList<AMQMessage>(_messages);
+ }
+
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
+ }
+
+ Iterator<AMQMessage> currentQueue = _messages.iterator();
+
+ while (currentQueue.hasNext())
+ {
+ AMQMessage message = currentQueue.next();
+ if (subscription.hasInterest(message))
+ {
+ subscription.enqueueForPreDelivery(message);
+ }
+ }
+ }
+
+ public synchronized void removeAMessageFromTop() throws AMQException
+ {
+ AMQMessage msg = poll();
+ if (msg != null)
+ {
+ msg.dequeue(_queue);
+ }
+ }
+
+ public synchronized void clearAllMessages() throws AMQException
+ {
+ AMQMessage msg = poll();
+ while (msg != null)
+ {
+ msg.dequeue(_queue);
+ msg = poll();
+ }
+ }
+
+
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+ {
+ AMQMessage message = messages.peek();
+
+ while (message != null && (sub.isBrowser() || message.taken()))
+ {
+ //remove the already taken message
+ messages.poll();
+ // try the next message
+ message = messages.peek();
+ }
+ return message;
+ }
+
+ public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
+ {
+ AMQMessage message = null;
+ try
+ {
+ message = getNextMessage(messageQueue, sub);
+
+ // message will be null if we have no messages in the messageQueue.
+ if (message == null)
+ {
+ return;
+ }
+ _log.info("Async Delivery Message:" + message + " to :" + sub);
+
+ sub.send(message, _queue);
+
+ //remove sent message from our queue.
+ messageQueue.poll();
+ }
+ catch (AMQException e)
+ {
+ message.release();
+ _log.error("Unable to deliver message as dequeue failed: " + e, e);
+ }
+ }
+
+ /**
+ * Only one thread should ever execute this method concurrently, but
+ * it can do so while other threads invoke deliver().
+ */
+ private void processQueue()
+ {
+ // Continue to process delivery while we haveSubscribers and messages
+ boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+
+ while (hasSubscribers && hasQueuedMessages())
+ {
+ hasSubscribers = false;
+
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ if (!sub.isSuspended())
+ {
+ sendNextMessage(sub);
+
+ hasSubscribers = true;
+ }
+ }
+ }
+ }
+
+ private void sendNextMessage(Subscription sub)
+ {
+ if (sub.hasFilters())
+ {
+ sendNextMessage(sub, sub.getPreDeliveryQueue());
+ if (sub.isAutoClose())
+ {
+ if (sub.getPreDeliveryQueue().isEmpty())
+ {
+ sub.close();
+ }
+ }
+ }
+ else
+ {
+ sendNextMessage(sub, _messages);
+ }
+ }
+
+ private AMQMessage poll()
+ {
+ return _messages.poll();
+ }
+
+ public void deliver(String name, AMQMessage msg) throws AMQException
+ {
+ _log.info(id() + "deliver :" + System.identityHashCode(msg));
+
+ //Check if we have someone to deliver the message to.
+ _lock.lock();
+ try
+ {
+ Subscription s = _subscriptions.nextSubscriber(msg);
+
+ if (s == null) //no-one can take the message right now.
+ {
+ _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
+ if (!msg.getPublishBody().immediate)
+ {
+ addMessageToQueue(msg);
+
+ //release lock now message is on queue.
+ _lock.unlock();
+
+ //Pre Deliver to all subscriptions
+ _log.info(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+
+ // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
+ if (_queue.isShared() && msg.getDeliveredToConsumer())
+ {
+ _log.info(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered.");
+ continue;
+ }
+
+ // Only give the message to those that want them.
+ if (sub.hasInterest(msg))
+ {
+ _log.info(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
+ sub.enqueueForPreDelivery(msg);
+ }
+ }
+ }
+ }
+ else
+ {
+ //release lock now
+ _lock.unlock();
+
+ _log.info(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s);
+ //Deliver the message
+ s.send(msg, _queue);
+ }
+ }
+ finally
+ {
+ //ensure lock is released
+ if (_lock.isLocked())
+ {
+ _lock.unlock();
+ }
+ }
+ }
+
+ //fixme remove
+ private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")";
+
+ private String id()
+ {
+ return id;
+ }
+
+ Runner asyncDelivery = new Runner();
+
+ private class Runner implements Runnable
+ {
+ public void run()
+ {
+ boolean running = true;
+ while (running)
+ {
+ processQueue();
+
+ //Check that messages have not been added since we did our last peek();
+ // Synchronize with the thread that adds to the queue.
+ // If the queue is still empty then we can exit
+
+ if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
+ {
+ running = false;
+ _processing.set(false);
+ }
+ }
+ }
+ }
+
+ public void processAsync(Executor executor)
+ {
+ _log.info("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
+
+ if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+ {
+ //are we already running? if so, don't re-run
+ if (_processing.compareAndSet(false, true))
+ {
+ executor.execute(asyncDelivery);
+ }
+ }
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index b8ba0118ab..6f31616114 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -73,4 +73,6 @@ interface DeliveryManager
void clearAllMessages() throws AMQException;
List<AMQMessage> getMessages();
+
+ void populatePreDeliveryQueue(Subscription subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index 9136264087..d04b6d3f60 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import java.util.Queue;
+
public interface Subscription
{
void send(AMQMessage msg, AMQQueue queue) throws AMQException;
@@ -29,4 +31,18 @@ public interface Subscription
boolean isSuspended();
void queueDeleted(AMQQueue queue) throws AMQException;
+
+ boolean hasFilters();
+
+ boolean hasInterest(AMQMessage msg);
+
+ Queue<AMQMessage> getPreDeliveryQueue();
+
+ void enqueueForPreDelivery(AMQMessage msg);
+
+ boolean isAutoClose();
+
+ void close();
+
+ boolean isBrowser();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
index 0fd44e4fbc..2bb77dc649 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,9 +33,10 @@ import org.apache.qpid.AMQException;
*/
public interface SubscriptionFactory
{
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
+ FieldTable filters, boolean noLocal) throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
- throws AMQException;
+
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index f94fd6259f..67380f024c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -22,8 +22,21 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Queue;
/**
* Encapsulation of a supscription to a queue.
@@ -44,23 +57,30 @@ public class SubscriptionImpl implements Subscription
private final Object sessionKey;
+ private Queue<AMQMessage> _messages;
+
+ private final boolean _noLocal;
+
/**
* True if messages need to be acknowledged
*/
private final boolean _acks;
+ private FilterManager _filters;
+ private final boolean _isBrowser;
+ private final Boolean _autoClose;
+ private boolean _closed = false;
public static class Factory implements SubscriptionFactory
{
- public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
}
}
@@ -68,6 +88,13 @@ public class SubscriptionImpl implements Subscription
String consumerTag, boolean acks)
throws AMQException
{
+ this(channelId, protocolSession, consumerTag, acks, null, false);
+ }
+
+ public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+ String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ throws AMQException
+ {
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
@@ -79,8 +106,61 @@ public class SubscriptionImpl implements Subscription
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _noLocal = noLocal;
+
+ _filters = FilterManagerFactory.createManager(filters);
+
+
+ if (_filters != null)
+ {
+ Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
+ if (isBrowser != null)
+ {
+ _isBrowser = (Boolean) isBrowser;
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+
+
+ if (_filters != null)
+ {
+ Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+
+
+ if (_filters != null)
+ {
+ _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+
+ }
+ else
+ {
+ // Reference the DeliveryManager
+ _messages = null;
+ }
}
+
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
String consumerTag)
throws AMQException
@@ -125,6 +205,44 @@ public class SubscriptionImpl implements Subscription
{
if (msg != null)
{
+ if (_isBrowser)
+ {
+ sendToBrowser(msg, queue);
+ }
+ else
+ {
+ sendToConsumer(msg, queue);
+ }
+ }
+ else
+ {
+ _logger.error("Attempt to send Null message", new NullPointerException());
+ }
+ }
+
+ private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
+
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
+ // received the message. If it is lost in transit that is not important.
+ if (_acks)
+ {
+ channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+ msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+ }
+ }
+
+ private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws AMQException
+ {
+ try
+ {
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -150,9 +268,9 @@ public class SubscriptionImpl implements Subscription
msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
}
}
- else
+ finally
{
- _logger.error("Attempt to send Null message", new NullPointerException());
+ msg.setDeliveredToConsumer();
}
}
@@ -170,4 +288,110 @@ public class SubscriptionImpl implements Subscription
{
channel.queueDeleted(queue);
}
+
+ public boolean hasFilters()
+ {
+ return _filters != null;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ if (_noLocal)
+ {
+ // We don't want local messages so check to see if message is one we sent
+ if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+ msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
+ }
+ else // if not then filter the message.
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
+ ") but not ours so filtering");
+ }
+ return checkFilters(msg);
+ }
+ }
+ else
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
+ }
+ return checkFilters(msg);
+ }
+ }
+
+ private boolean checkFilters(AMQMessage msg)
+ {
+ if (_filters != null)
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
+ }
+ return _filters.allAllow(msg);
+ }
+ else
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+ }
+
+ return true;
+ }
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return _messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ if (_messages != null)
+ {
+ _messages.offer(msg);
+ }
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public void close()
+ {
+ if (!_closed)
+ {
+ _logger.info("Closing autoclose subscription:" + this);
+ protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+ _closed = true;
+ }
+ }
+
+ public boolean isBrowser()
+ {
+ return _isBrowser;
+ }
+
+
+ private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+ {
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
+ deliveryTag, false, exchange,
+ routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
index 353b461c8d..4df88baebc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.List;
+
/**
* Abstraction of actor that will determine the subscriber to whom
* a message will be sent.
*/
public interface SubscriptionManager
{
+ public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
public Subscription nextSubscriber(AMQMessage msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 0de036de36..8272202571 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -60,6 +60,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Remove the subscription, returning it if it was found
+ *
* @param subscription
* @return null if no match was found
*/
@@ -92,7 +93,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Return the next unsuspended subscription or null if not found.
- *
+ * <p/>
* Performance note:
* This method can scan all items twice when looking for a subscription that is not
* suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -107,31 +108,51 @@ class SubscriptionSet implements WeightedSubscriptionManager
return null;
}
- try {
- final Subscription result = nextSubscriber();
- if (result == null) {
+ try
+ {
+ final Subscription result = nextSubscriberImpl(msg);
+ if (result == null)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
- } else {
+ return nextSubscriberImpl(msg);
+ }
+ else
+ {
return result;
}
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
+ return nextSubscriber(msg);
}
}
- private Subscription nextSubscriber()
+ private Subscription nextSubscriberImpl(AMQMessage msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext()) {
+ while (iterator.hasNext())
+ {
Subscription subscription = iterator.next();
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended()) {
- return subscription;
+
+ if (!subscription.isSuspended())
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
}
}
+
return null;
}
@@ -147,11 +168,19 @@ class SubscriptionSet implements WeightedSubscriptionManager
return _subscriptions.isEmpty();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return _subscriptions;
+ }
+
public boolean hasActiveSubscribers()
{
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) return true;
+ if (!s.isSuspended())
+ {
+ return true;
+ }
}
return false;
}
@@ -161,7 +190,10 @@ class SubscriptionSet implements WeightedSubscriptionManager
int count = 0;
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) count++;
+ if (!s.isSuspended())
+ {
+ count++;
+ }
}
return count;
}
@@ -169,6 +201,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Notification that a queue has been deleted. This is called so that the subscription can inform the
* channel, which in turn can update its list of unacknowledged messages.
+ *
* @param queue
*/
public void queueDeleted(AMQQueue queue) throws AMQException
@@ -179,7 +212,8 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
}
- int size() {
+ int size()
+ {
return _subscriptions.size();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index c8715f263f..7332ffbbee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
class SynchronizedDeliveryManager implements DeliveryManager
{
- private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
+ private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
/**
* Holds any queued messages
@@ -124,6 +124,11 @@ class SynchronizedDeliveryManager implements DeliveryManager
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ //no-op . This DM has no PreDeliveryQueues
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -245,7 +250,6 @@ class SynchronizedDeliveryManager implements DeliveryManager
else
{
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 8c3692a98d..7321854034 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.store.MessageStore;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
/**
* @author Apache Software Foundation
@@ -49,6 +50,8 @@ public class NonTransactionalContext implements TransactionalContext
*/
private final List<RequiredDeliveryException> _returnMessages;
+ private Set<Long> _browsedAcks;
+
private final MessageStore _messageStore;
/**
@@ -57,11 +60,12 @@ public class NonTransactionalContext implements TransactionalContext
private boolean _inTran;
public NonTransactionalContext(MessageStore messageStore, AMQChannel channel,
- List<RequiredDeliveryException> returnMessages)
+ List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
{
_channel = channel;
_returnMessages = returnMessages;
_messageStore = messageStore;
+ _browsedAcks = browsedAcks;
}
public void beginTranIfNecessary() throws AMQException
@@ -111,12 +115,19 @@ public class NonTransactionalContext implements TransactionalContext
//Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
// tells the server to acknowledge all outstanding mesages.
_log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
- unacknowledgedMessageMap.size());
+ unacknowledgedMessageMap.size());
unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- message.discard();
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ message.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
return false;
}
@@ -137,7 +148,14 @@ public class NonTransactionalContext implements TransactionalContext
unacknowledgedMessageMap.drainTo(acked, deliveryTag);
for (UnacknowledgedMessage msg : acked)
{
- msg.discard();
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ msg.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
index 4767844abe..b58d551226 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
@@ -20,10 +20,15 @@
*/
package org.apache.qpid.server.util;
+import org.apache.log4j.Logger;
+
import java.util.Iterator;
public class CircularBuffer implements Iterable
{
+
+ private static final Logger _logger = Logger.getLogger(CircularBuffer.class);
+
private final Object[] _log;
private int _size;
private int _index;
@@ -102,7 +107,7 @@ public class CircularBuffer implements Iterable
{
for(Object o : this)
{
- System.out.println(o);
+ _logger.info(o);
}
}
@@ -120,7 +125,7 @@ public class CircularBuffer implements Iterable
for(String s : items)
{
buffer.add(s);
- System.out.println(buffer);
+ _logger.info(buffer);
}
}
}
diff --git a/java/client/pom.xml b/java/client/pom.xml
index f80db8b774..73090078eb 100644
--- a/java/client/pom.xml
+++ b/java/client/pom.xml
@@ -35,7 +35,6 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
- <amqj.logging.level>warn</amqj.logging.level>
</properties>
<dependencies>
@@ -98,6 +97,11 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemProperties>
diff --git a/java/client/src/log4j.properties b/java/client/src/log4j.properties
index d3135ff574..5614cb76f3 100644
--- a/java/client/src/log4j.properties
+++ b/java/client/src/log4j.properties
@@ -1,10 +1,10 @@
-log4j.rootLogger=${root.logging.level}
+log4j.rootLogger=${amqj.logging.level}
log4j.logger.org.apache.qpid=${amqj.logging.level}, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=info
+log4j.appender.console.Threshold=debug
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/client/src/main/java/log4j.properties b/java/client/src/main/java/log4j.properties
index db8f43cb3b..a4b4d3ed6c 100644
--- a/java/client/src/main/java/log4j.properties
+++ b/java/client/src/main/java/log4j.properties
@@ -16,10 +16,10 @@
# specific language governing permissions and limitations
# under the License.
#
-log4j.rootLogger=WARN
+log4j.rootLogger=${amqj.logging.level}
-log4j.logger.org.apache.qpid=WARN, console
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
new file mode 100644
index 0000000000..5c753946a6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.log4j.Logger;
+
+import java.util.Enumeration;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+public class AMQQueueBrowser implements QueueBrowser
+{
+ private static final Logger _logger = Logger.getLogger(AMQQueueBrowser.class);
+
+
+ private AtomicBoolean _isClosed = new AtomicBoolean();
+ private final AMQSession _session;
+ private final AMQQueue _queue;
+ private final ArrayList<BasicMessageConsumer> _consumers = new ArrayList<BasicMessageConsumer>();
+ private final String _messageSelector;
+
+
+ AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelector) throws JMSException
+ {
+ _session = session;
+ _queue = queue;
+ _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ consumer.close();
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ checkState();
+ return _queue;
+ }
+
+ private void checkState() throws JMSException
+ {
+ if (_isClosed.get())
+ {
+ throw new IllegalStateException("Queue Browser");
+ }
+ if (_session.isClosed())
+ {
+ throw new IllegalStateException("Session is closed");
+ }
+
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+
+ checkState();
+ return _messageSelector;
+ }
+
+ public Enumeration getEnumeration() throws JMSException
+ {
+ checkState();
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ _consumers.add(consumer);
+
+ return new Enumeration()
+ {
+
+
+ Message _nextMessage = consumer.receive();
+
+
+ public boolean hasMoreElements()
+ {
+ _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ return (_nextMessage != null);
+ }
+
+ public Object nextElement()
+ {
+ Message msg = _nextMessage;
+ try
+ {
+ _logger.info("QB:nextElement about to receive");
+
+ _nextMessage = consumer.receive();
+ _logger.info("QB:nextElement received:" + _nextMessage);
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("Exception caught while queue browsing", e);
+ _nextMessage = null;
+ }
+
+ return msg;
+ }
+ };
+ }
+
+ public void close() throws JMSException
+ {
+ for (BasicMessageConsumer consumer : _consumers)
+ {
+ consumer.close();
+ }
+ _consumers.clear();
+ }
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index fae5e7ac08..2136d565f1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -23,13 +23,15 @@ package org.apache.qpid.client;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -69,15 +71,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled
- * correctly. Note this only keeps a record of subscriptions which have been created
- * in the current instance. It does not remember subscriptions between executions of the
- * client
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled
+ * correctly. Note this only keeps a record of subscriptions which have been created
+ * in the current instance. It does not remember subscriptions between executions of the
+ * client
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
* Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -143,6 +145,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _inRecovery;
+
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
@@ -176,7 +179,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (message.deliverBody != null)
{
- final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
if (consumer == null)
{
@@ -210,17 +213,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
}
+
}
catch (Exception e)
{
@@ -318,7 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -334,7 +335,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MapMessage createMapMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -350,7 +351,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public javax.jms.Message createMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -366,7 +367,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -382,7 +383,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -400,7 +401,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public StreamMessage createStreamMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
@@ -417,7 +418,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
@@ -434,7 +435,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -504,7 +505,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -569,7 +570,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e)
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -721,11 +722,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void acknowledge() throws JMSException
{
- if(isClosed())
+ if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
- for(BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.acknowledge();
}
@@ -734,7 +735,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
-
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
@@ -843,7 +843,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false,
false,
null,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -855,7 +857,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -868,7 +872,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
noLocal,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
+ }
+
+ public MessageConsumer createBrowserConsumer(Destination destination,
+ String messageSelector,
+ boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ noLocal,
+ false,
+ messageSelector,
+ null,
+ true,
+ true);
}
public MessageConsumer createConsumer(Destination destination,
@@ -878,7 +901,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
}
@@ -890,7 +913,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -902,7 +925,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -915,7 +938,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
protected MessageConsumer createConsumerImpl(final Destination destination,
@@ -924,7 +947,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
final boolean noLocal,
final boolean exclusive,
final String selector,
- final FieldTable rawSelector) throws JMSException
+ final FieldTable rawSelector,
+ final boolean noConsume,
+ final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
@@ -948,12 +973,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
- _acknowledgeMode);
+ _acknowledgeMode, noConsume, autoClose);
try
{
registerConsumer(consumer, false);
}
+ catch (AMQInvalidSelectorException ise)
+ {
+ JMSException ex = new InvalidSelectorException(ise.getMessage());
+ ex.setLinkedException(ise);
+ throw ex;
+ }
catch (AMQException e)
{
JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -963,7 +994,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized(destination)
{
- _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+ _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
_destinationConsumerCount.get(destination).incrementAndGet();
}
@@ -975,16 +1006,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void checkTemporaryDestination(Destination destination)
throws JMSException
{
- if((destination instanceof TemporaryDestination))
+ if ((destination instanceof TemporaryDestination))
{
_logger.debug("destination is temporary");
final TemporaryDestination tempDest = (TemporaryDestination) destination;
- if(tempDest.getSession() != this)
+ if (tempDest.getSession() != this)
{
_logger.debug("destination is on different session");
throw new JMSException("Cannot consume from a temporary destination created onanother session");
}
- if(tempDest.isDeleted())
+ if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new JMSException("Cannot consume from a deleted destination");
@@ -1065,12 +1096,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @return the consumer tag generated by the broker
*/
private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
- boolean nowait) throws AMQException
+ boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
+ FieldTable arguments = FieldTableFactory.newFieldTable();
+ if (messageSelector != null && !messageSelector.equals(""))
+ {
+ arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+ }
+ if(consumer.isAutoClose())
+ {
+ arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
+ if(consumer.isNoConsume())
+ {
+ arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
+
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
@@ -1080,7 +1125,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
queueName, tag, consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(), nowait, null);
+ consumer.isExclusive(), nowait, arguments);
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1220,7 +1265,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
if (subscriber != null)
{
@@ -1247,8 +1292,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1278,8 +1323,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1291,16 +1336,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return createBrowser(queue, null);
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1476,7 +1519,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+ try
+ {
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+ }
+ catch (JMSException e) //thrown by getMessageSelector
+ {
+ throw new AMQException(e.getMessage(), e);
+ }
}
/**
@@ -1489,7 +1539,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_consumers.remove(consumer.getConsumerTag());
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if(subscriptionName != null)
+ if (subscriptionName != null)
{
_subscriptions.remove(subscriptionName);
}
@@ -1497,7 +1547,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
Destination dest = consumer.getDestination();
synchronized(dest)
{
- if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
{
_destinationConsumerCount.remove(dest);
}
@@ -1567,6 +1617,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
+ public void confirmConsumerCancelled(String consumerTag)
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ if((consumer != null) && (consumer.isAutoClose()))
+ {
+ consumer.closeWhenNoMessages(true);
+ }
+ }
+
+
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
@@ -1576,7 +1636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
throw new javax.jms.InvalidDestinationException("Invalid Topic");
}
- if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+ if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
{
throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
}
@@ -1597,4 +1657,5 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw new javax.jms.InvalidDestinationException("Invalid Queue");
}
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index f0d3cf5abc..cefaca8d52 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -145,10 +145,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private Thread _receivingThread;
+ /**
+ * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+ * on the queue. This is used for queue browsing.
+ */
+ private boolean _autoClose;
+ private boolean _closeWhenNoMessages;
+
+ private boolean _noConsume;
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
- boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
- int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
+ boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+ int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -164,6 +173,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_exclusive = exclusive;
_acknowledgeMode = acknowledgeMode;
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
+ _autoClose = autoClose;
+ _noConsume = noConsume;
}
public AMQDestination getDestination()
@@ -321,6 +332,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = null;
if (l > 0)
{
@@ -350,6 +365,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+ private boolean closeOnAutoClose() throws JMSException
+ {
+ if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
+ {
+ close(false);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
@@ -358,6 +386,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -402,22 +434,31 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+
public void close() throws JMSException
{
+ close(true);
+ }
+
+ public void close(boolean sendClose) throws JMSException
+ {
synchronized(_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
-
- try
- {
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
- }
- catch (AMQException e)
+ if(sendClose)
{
- _logger.error("Error closing consumer: " + e, e);
- throw new JMSException("Error closing consumer: " + e);
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+
+ try
+ {
+ _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error closing consumer: " + e, e);
+ throw new JMSException("Error closing consumer: " + e);
+ }
}
deregisterConsumer();
@@ -513,6 +554,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ break;
case Session.DUPS_OK_ACKNOWLEDGE:
if (++_outstanding >= _prefetchHigh)
{
@@ -539,7 +586,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
break;
case Session.SESSION_TRANSACTED:
- _lastDeliveryTag = msg.getDeliveryTag();
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _lastDeliveryTag = msg.getDeliveryTag();
+ }
break;
}
}
@@ -630,4 +684,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_unacknowledgedDeliveryTags.clear();
}
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+
+ public boolean isNoConsume()
+ {
+ return _noConsume;
+ }
+
+ public void closeWhenNoMessages(boolean b)
+ {
+ _closeWhenNoMessages = b;
+
+ if(_closeWhenNoMessages
+ && _synchronousQueue.isEmpty()
+ && _receiving.get()
+ && _messageListener != null)
+ {
+ _receivingThread.interrupt();
+ }
+
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
index 6ab7808110..9ee802ff10 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
@@ -20,16 +20,16 @@
*/
package org.apache.qpid.client;
-import java.util.Enumeration;
+import org.apache.qpid.common.QpidProperties;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
+import java.util.Enumeration;
public class QpidConnectionMetaData implements ConnectionMetaData
{
-
QpidConnectionMetaData(AMQConnection conn)
{
}
@@ -46,7 +46,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData
public String getJMSProviderName() throws JMSException
{
- return "Apache Qpid";
+ return "Apache " + QpidProperties.getProductName();
}
public String getJMSVersion() throws JMSException
@@ -71,8 +71,8 @@ public class QpidConnectionMetaData implements ConnectionMetaData
public String getProviderVersion() throws JMSException
{
- return "QPID (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
- + getProtocolVersion() + "] )";
+ return QpidProperties.getProductName() + " (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
+ + getProtocolVersion() + "] )";
}
private String getProtocolVersion()
@@ -89,8 +89,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData
public String getClientVersion()
{
- // TODO - get client build version from properties file or similar
- return "<unknown>";
+ return QpidProperties.getBuildVersion();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
new file mode 100644
index 0000000000..d855e97204
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
@@ -0,0 +1,35 @@
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.log4j.Logger;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class BasicCancelOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
+ private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
+
+ public static BasicCancelOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicCancelOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("New BasicCancelOk method received");
+ BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
+ evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 2bd93f1508..fd2968cdfd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.protocol.AMQConstant;
@@ -46,7 +47,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
- _logger.debug("ChannelClose method received");
+ _logger.debug("ChannelClose method received");
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
int errorCode = method.replyCode;
@@ -65,17 +66,21 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
{
throw new AMQNoConsumersException("Error: " + reason, null);
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode())
+ {
+ _logger.info("Broker responded with Invalid Selector.");
+
+ throw new AMQInvalidSelectorException(reason);
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- throw new AMQNoRouteException("Error: " + reason, null);
- }
- else
- {
- throw new AMQChannelClosedException(errorCode, "Error: " + reason);
- }
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
}
+
}
evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 9333df3fe4..1e0366ec4d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -22,6 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -119,10 +121,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put("instance", ps.getClientID());
- clientProperties.put("product", "Qpid");
- clientProperties.put("version", "1.0");
- clientProperties.put("platform", getFullSystemInfo());
+
+ clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
+ _log.info("Product name: " + QpidProperties.getProductName());
+ clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
+ clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
+ clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
saslResponse, selectedLocale));
}
@@ -130,6 +134,10 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
{
throw new AMQException(_log, "Unable to decode data: " + e, e);
}
+ catch (Throwable t)
+ {
+ _log.error("Error: " + t, t);
+ }
}
private String getFullSystemInfo()
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index fea7a29594..40d8b28411 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -26,7 +26,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -46,7 +47,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
protected ByteBuffer _data;
private boolean _readableProperties = false;
- private boolean _readableMessage = false;
+ protected boolean _readableMessage = false;
+ protected boolean _changedData;
private Destination _destination;
private BasicMessageConsumer _consumer;
@@ -60,6 +62,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
_readableProperties = false;
_readableMessage = (data != null);
+ _changedData = (data == null);
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
@@ -521,16 +524,16 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
return !_readableMessage;
}
- public void reset()
+ public void reset()
{
- if (_readableMessage)
+ if (!_changedData)
{
_data.rewind();
}
else
{
_data.flip();
- _readableMessage = true;
+ _changedData = false;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index f5c9f7111a..d769300c69 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -59,6 +59,12 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
super(messageNbr, contentHeader, data);
}
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
public String getMimeType()
{
return MIME_TYPE;
@@ -226,48 +232,56 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
public void writeBoolean(boolean b) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.put(b ? (byte) 1 : (byte) 0);
}
public void writeByte(byte b) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.put(b);
}
public void writeShort(short i) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putShort(i);
}
public void writeChar(char c) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putChar(c);
}
public void writeInt(int i) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putInt(i);
}
public void writeLong(long l) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putLong(l);
}
public void writeFloat(float v) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putFloat(v);
}
public void writeDouble(double v) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putDouble(v);
}
@@ -281,7 +295,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
_data.putShort((short)encodedString.limit());
_data.put(encodedString);
-
+ _changedData = true;
//_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must add the null terminator manually
//_data.put((byte)0);
@@ -298,12 +312,14 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
{
checkWritable();
_data.put(bytes);
+ _changedData = true;
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
checkWritable();
_data.put(bytes, offset, length);
+ _changedData = true;
}
public void writeObject(Object object) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 4fb070d2ff..35c5377f14 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -112,7 +112,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
}
-
+
public Serializable getObject() throws JMSException
{
ObjectInputStream in = null;
@@ -123,18 +123,18 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
try
{
- _data.rewind();
+ _data.rewind();
in = new ObjectInputStream(_data.asInputStream());
return (Serializable) in.readObject();
}
catch (IOException e)
- {
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ {
+ e.printStackTrace();
+ throw new MessageFormatException("Could not deserialize message: " + e);
}
catch (ClassNotFoundException e)
{
- e.printStackTrace();
+ e.printStackTrace();
throw new MessageFormatException("Could not deserialize message: " + e);
}
finally
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index c2dfdc1b65..6709ff802d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -86,6 +86,12 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
super(messageNbr, contentHeader, data);
}
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
public String getMimeType()
{
return MIME_TYPE;
@@ -103,6 +109,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
{
checkWritable();
_data.put(type);
+ _changedData = true;
}
public boolean readBoolean() throws JMSException
@@ -693,7 +700,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
{
_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must write the null terminator ourselves
- _data.put((byte)0);
+ _data.put((byte) 0);
}
catch (CharacterCodingException e)
{
@@ -706,7 +713,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeBytes(byte[] bytes) throws JMSException
{
- writeBytes(bytes, 0, bytes == null?0:bytes.length);
+ writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index 76f8a1c32f..d8394b0489 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -117,6 +117,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
{
_data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
}
+ _changedData=true;
}
_decodedValue = text;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index a4ed89719b..6a40fd3133 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -406,4 +406,12 @@ public class AMQProtocolSession implements ProtocolVersionList
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
+
+ public void confirmConsumerCancelled(int channelId, String consumerTag)
+ {
+ final Integer chId = channelId;
+ final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+
+ session.confirmConsumerCancelled(consumerTag);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 492571b6af..21ae3fc71f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -110,7 +110,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
}
else
{
- throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+ throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 887850c06e..50bd1667f9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -103,6 +103,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+ frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index 0de2850080..d6364f45b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -59,7 +59,7 @@ public class SocketTransportConnection implements ITransportConnection
// once more testing of the performance of the simple allocator has been done
if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
{
- _logger.warn("Using SimpleByteBufferAllocator");
+ _logger.info("Using SimpleByteBufferAllocator");
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index 53e7fd066e..5497cafed4 100644
--- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -40,11 +40,15 @@ import javax.naming.spi.InitialContextFactory;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
- protected final Logger _logger = Logger.getLogger(getClass());
+ protected final Logger _logger = Logger.getLogger(PropertiesFileInitialContextFactory.class);
private String CONNECTION_FACTORY_PREFIX = "connectionfactory.";
private String DESTINATION_PREFIX = "destination.";
@@ -55,6 +59,41 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
{
Map data = new ConcurrentHashMap();
+ try
+ {
+
+ String file = null;
+ if (environment.contains(Context.PROVIDER_URL))
+ {
+ file = (String) environment.get(Context.PROVIDER_URL);
+ }
+ else
+ {
+ file = System.getProperty(Context.PROVIDER_URL);
+ }
+
+ if (file != null)
+ {
+ _logger.info("Loading Properties from:" + file);
+ //Load the properties specified
+ Properties p = new Properties();
+
+ p.load(new BufferedInputStream(new FileInputStream(file)));
+
+ environment.putAll(p);
+ _logger.info("Loaded Context Properties:" + environment.toString());
+ }
+ else
+ {
+ _logger.warn("No Provider URL specified.");
+ }
+ }
+ catch (IOException ioe)
+ {
+ _logger.warn("Unable to load property file specified in Provider_URL:" +
+ environment.get(Context.PROVIDER_URL));
+ }
+
createConnectionFactories(data, environment);
createDestinations(data, environment);
diff --git a/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
index a1e15258c3..f7a1502347 100644
--- a/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
+++ b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
@@ -29,6 +29,7 @@ import javax.naming.Context;
import javax.jms.*;
import javax.jms.MessageConsumer;
import javax.jms.Session;
+import javax.jms.Message;
import java.util.Hashtable;
import java.io.File;
import java.io.FilenameFilter;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index 7f76baa157..17679788bd 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -116,7 +116,9 @@ public class PropertyValueTest extends TestCase implements MessageListener
m.setIntProperty("Int", (int) Integer.MAX_VALUE);
m.setJMSCorrelationID("Correlation");
- m.setJMSPriority(100);
+ //fixme the m.setJMSMessage has no effect
+ producer.setPriority(8);
+ m.setJMSPriority(3);
// Queue
Queue q;
@@ -182,10 +184,8 @@ public class PropertyValueTest extends TestCase implements MessageListener
(int) Integer.MAX_VALUE, m.getIntProperty("Int"));
Assert.assertEquals("Check CorrelationID properties are correctly transported",
"Correlation", m.getJMSCorrelationID());
-
- _logger.warn("getJMSPriority not being verified.");
-// Assert.assertEquals("Check Priority properties are correctly transported",
-// 100, m.getJMSPriority());
+ Assert.assertEquals("Check Priority properties are correctly transported",
+ 8, m.getJMSPriority());
// Queue
Assert.assertEquals("Check ReplyTo properties are correctly transported",
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
new file mode 100644
index 0000000000..27a2ccb32e
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import org.apache.log4j.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.DeliveryMode;
+
+import junit.framework.TestCase;
+
+public class SelectorTest extends TestCase implements MessageListener
+{
+
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(SelectorTest.class);
+
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private int count;
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(randomize("SessionStartTest"), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+
+
+ String selector = null;
+// selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ //_session.createConsumer(destination).setMessageListener(this);
+ _session.createConsumer(destination, selector).setMessageListener(this);
+ }
+
+ public synchronized void test() throws JMSException, InterruptedException
+ {
+ try
+ {
+ Message msg = _session.createTextMessage("Message");
+ msg.setJMSPriority(1);
+ msg.setIntProperty("Cost", 2);
+ msg.setJMSType("Special");
+
+ _logger.info("Sending Message:" + msg);
+
+ ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
+ System.out.println("Message sent, waiting for response...");
+ wait(1000);
+
+ if (count > 0)
+ {
+ _logger.info("Got message");
+ }
+
+ if (count == 0)
+ {
+ fail("Did not get message!");
+ //throw new RuntimeException("Did not get message!");
+ }
+ }
+ finally
+ {
+ _session.close();
+ _connection.close();
+ }
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ count++;
+ _logger.info("Got Message:" + message);
+ notify();
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ SelectorTest test = new SelectorTest();
+ test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0];
+ test.setUp();
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(SelectorTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
index 79bd4f6dde..5bce3f64a2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -121,7 +121,7 @@ public class ChannelCloseOkTest extends TestCase
{
if (_connection != null)
{
- System.out.println(">>>>>>>>>>>>>>.. closing");
+ _log.info(">>>>>>>>>>>>>>.. closing");
_connection.close();
}
}
@@ -137,7 +137,7 @@ public class ChannelCloseOkTest extends TestCase
{
public void onException(JMSException jmsException)
{
- _log.error("onException - ", jmsException);
+ _log.warn("onException - "+jmsException.getMessage());
}
});
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
index 0005b20fb1..3022c8a59d 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
@@ -20,6 +20,8 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.util.LogMessage;
+import java.util.List;
+
class ClusteredSubscriptionManager extends SubscriptionSet
{
private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -79,6 +81,11 @@ class ClusteredSubscriptionManager extends SubscriptionSet
return ClusteredSubscriptionManager.this.getWeight();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return ClusteredSubscriptionManager.super.getSubscriptions();
+ }
+
public boolean hasActiveSubscribers()
{
return ClusteredSubscriptionManager.super.hasActiveSubscribers();
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
index 0bb6537930..eec2563480 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
@@ -18,12 +18,12 @@
package org.apache.qpid.server.queue;
import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Distributes messages among a list of subsscription managers, using their
* weighting.
- *
*/
class NestedSubscriptionManager implements SubscriptionManager
{
@@ -41,11 +41,24 @@ class NestedSubscriptionManager implements SubscriptionManager
_subscribers.remove(s);
}
+
+ public List<Subscription> getSubscriptions()
+ {
+ List<Subscription> allSubs = new LinkedList<Subscription>();
+
+ for (WeightedSubscriptionManager subMans : _subscribers)
+ {
+ allSubs.addAll(subMans.getSubscriptions());
+ }
+
+ return allSubs;
+ }
+
public boolean hasActiveSubscribers()
{
- for(WeightedSubscriptionManager s : _subscribers)
+ for (WeightedSubscriptionManager s : _subscribers)
{
- if(s.hasActiveSubscribers())
+ if (s.hasActiveSubscribers())
{
return true;
}
@@ -56,9 +69,9 @@ class NestedSubscriptionManager implements SubscriptionManager
public Subscription nextSubscriber(AMQMessage msg)
{
WeightedSubscriptionManager start = current();
- for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+ for (WeightedSubscriptionManager s = start; s != null; s = next(start))
{
- if(hasMore(s))
+ if (hasMore(s))
{
return nextSubscriber(s);
}
@@ -91,7 +104,7 @@ class NestedSubscriptionManager implements SubscriptionManager
private WeightedSubscriptionManager next()
{
_iterations = 0;
- if(++_index >= _subscribers.size())
+ if (++_index >= _subscribers.size())
{
_index = 0;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index eabf374e81..06dfd29cab 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -22,6 +22,9 @@ import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.AMQException;
+import java.util.Queue;
+import java.util.List;
+
class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
{
private final GroupManager _groupMgr;
@@ -73,6 +76,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return _count;
}
+ public List<Subscription> getSubscriptions()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean hasActiveSubscribers()
{
return getWeight() == 0;
@@ -85,9 +93,49 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
public void queueDeleted(AMQQueue queue)
{
- if(queue instanceof ClusteredQueue)
+ if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
}
}
+
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+ }
+
+ public boolean isAutoClose()
+ {
+ return false;
+ }
+
+ public void close()
+ {
+ //no-op
+ }
+
+ public boolean isBrowser()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void sendNextMessage(AMQQueue queue)
+ {
+
+ }
}
diff --git a/java/common/src/main/java/log4j.properties b/java/common/src/main/java/log4j.properties
new file mode 100644
index 0000000000..6d596d1d19
--- /dev/null
+++ b/java/common/src/main/java/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
new file mode 100644
index 0000000000..dcd039b789
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+public class AMQInvalidSelectorException extends AMQException
+{
+ public AMQInvalidSelectorException(String message)
+ {
+ super(AMQConstant.INVALID_SELECTOR.getCode(),message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
new file mode 100644
index 0000000000..56219755a3
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.common;
+
+public enum AMQPFilterTypes
+{
+ JMS_SELECTOR("x-filter-jms-selector"),
+ NO_CONSUME("x-filter-no-consume"),
+ AUTO_CLOSE("x-filter-auto-close");
+
+ private final String _value;
+
+ AMQPFilterTypes(String value)
+ {
+ _value = value;
+ }
+
+ public String getValue()
+ {
+ return _value;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
new file mode 100644
index 0000000000..07371b5182
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.common;
+
+public enum ClientProperties
+{
+ instance,
+ product,
+ version,
+ platform
+}
diff --git a/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
new file mode 100644
index 0000000000..f4f764db1b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.common;
+
+import org.apache.log4j.Logger;
+
+import java.util.Properties;
+import java.util.Map;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class QpidProperties
+{
+ private static final Logger _logger = Logger.getLogger(QpidProperties.class);
+
+ public static final String VERSION_RESOURCE = "qpidversion.properties";
+
+ public static final String PRODUCT_NAME_PROPERTY = "qpid.name";
+ public static final String RELEASE_VERSION_PROPERTY = "qpid.version";
+ public static final String BUILD_VERSION_PROPERTY = "qpid.svnversion";
+
+ private static final String DEFAULT = "unknown";
+
+ private static String productName = DEFAULT;
+ private static String releaseVersion = DEFAULT;
+ private static String buildVersion = DEFAULT;
+
+ /** Loads the values from the version properties file. */
+ static
+ {
+ Properties props = new Properties();
+
+ try
+ {
+ InputStream propertyStream = QpidProperties.class.getClassLoader().getResourceAsStream(VERSION_RESOURCE);
+ if (propertyStream == null)
+ {
+ _logger.warn("Unable to find resource " + VERSION_RESOURCE + " from classloader");
+ }
+ else
+ {
+ props.load(propertyStream);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dumping QpidProperties");
+ for (Map.Entry<Object,Object> entry : props.entrySet())
+ {
+ _logger.debug("Property: " + entry.getKey() + " Value: "+ entry.getValue());
+ }
+ _logger.debug("End of property dump");
+ }
+
+ productName = readPropertyValue(props, PRODUCT_NAME_PROPERTY);
+ releaseVersion = readPropertyValue(props, RELEASE_VERSION_PROPERTY);
+ buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY);
+ }
+ }
+ catch (IOException e)
+ {
+ // Log a warning about this and leave the values initialized to unknown.
+ _logger.error("Could not load version.properties resource: " + e, e);
+ }
+ }
+
+ public static String getProductName()
+ {
+ return productName;
+ }
+
+ public static String getReleaseVersion()
+ {
+ return releaseVersion;
+ }
+
+ public static String getBuildVersion()
+ {
+ return buildVersion;
+ }
+
+ public static String getVersionString()
+ {
+ return getProductName() + " - " + getReleaseVersion() + " build: " + getBuildVersion();
+ }
+
+ private static String readPropertyValue(Properties props, String propertyName)
+ {
+ String retVal = (String) props.get(propertyName);
+ if (retVal == null)
+ {
+ retVal = DEFAULT;
+ }
+ return retVal;
+ }
+
+ public static void main(String[] args)
+ {
+ System.out.println(getVersionString());
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
index 1292ff2f6e..4b8f56e4e8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
@@ -129,7 +129,7 @@ public class PropertyFieldTable implements FieldTable
}
catch (Exception e)
{
- _logger.warn("Unable to decode PropertyFieldTable format:" + textFormat, e);
+ _logger.warn("Unable to decode PropertyFieldTable format:" + textFormat);
throw new IllegalArgumentException("Unable to decode PropertyFieldTable format:" + textFormat);
}
}
@@ -483,7 +483,7 @@ public class PropertyFieldTable implements FieldTable
{
return _properties.containsKey(name) && (_properties.get(name) == null) &&
_propertyNamesTypeMap.get(name).equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX);
-
+
}
@@ -606,7 +606,8 @@ public class PropertyFieldTable implements FieldTable
// AMQ start character
if (!(Character.isLetter(propertyName.charAt(0))
|| propertyName.charAt(0) == '$'
- || propertyName.charAt(0) == '#'))
+ || propertyName.charAt(0) == '#'
+ || propertyName.charAt(0) == '_')) // Not official AMQP added for JMS.
{
throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid AMQP start character");
}
@@ -1156,9 +1157,9 @@ public class PropertyFieldTable implements FieldTable
if (type == null)
{
String msg = "Field '" + key + "' - unsupported field table type: " + type + ".";
- //some extra trace information...
- msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
- throw new AMQFrameDecodingException(msg);
+ //some extra trace information...
+ msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
+ throw new AMQFrameDecodingException(msg);
}
Object value;
@@ -1203,7 +1204,7 @@ public class PropertyFieldTable implements FieldTable
value = EncodingUtils.readBytes(buffer);
break;
default:
- String msg = "Internal error, the following type identifier is not handled: " + type;
+ String msg = "Internal error, the following type identifier is not handled: " + type;
throw new AMQFrameDecodingException(msg);
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index fc83c0726d..a0d243ca30 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -55,7 +55,7 @@ public final class AMQConstant
{
return _name;
}
-
+
public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true);
@@ -74,6 +74,8 @@ public final class AMQConstant
public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true);
+ public static final AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true);
+
public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true);
diff --git a/java/distribution/pom.xml b/java/distribution/pom.xml
new file mode 100644
index 0000000000..ca91c222ee
--- /dev/null
+++ b/java/distribution/pom.xml
@@ -0,0 +1,142 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-distribution</artifactId>
+ <packaging>jar</packaging>
+ <name>Qpid Distribution</name>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ <java.source.version>1.5</java.source.version>
+ <qpid.version>${pom.version}</qpid.version>
+ <qpid.targetDir>${project.build.directory}</qpid.targetDir>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>repo1.maven.org</id>
+ <name>Maven eclipse Repository</name>
+ <url>http://repo1.maven.org/eclipse</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>qpid-common</artifactId>
+ <version>${pom.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>qpid-broker</artifactId>
+ <version>${pom.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>qpid-client</artifactId>
+ <version>${pom.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${pom.groupId}.management</groupId>
+ <artifactId>org.apache.qpid.management.ui</artifactId>
+ <version>${pom.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>${java.source.version}</source>
+ <target>${java.source.version}</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${assembly.version}</version>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/bin.xml</descriptor>
+ </descriptors>
+ <finalName>qpid-${pom.version}</finalName>
+ <outputDirectory>${qpid.targetDir}</outputDirectory>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <finalName>qpid-incubating</finalName>
+ <archive>
+ <manifest>
+ <addClasspath>true</addClasspath>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <resources>
+ <resource>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>distribution-package</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/bin.xml</descriptor>
+ <descriptor>src/main/assembly/client-bin.xml</descriptor>
+ <descriptor>src/main/assembly/src.xml</descriptor>
+ <descriptor>src/main/assembly/management-eclipse-plugin.xml</descriptor>
+ <descriptor>src/main/assembly/management-eclipse-plugin-unix.xml</descriptor>
+ </descriptors>
+ <finalName>qpid-${pom.version}</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/java/distribution/src/main/assembly/bin.xml b/java/distribution/src/main/assembly/bin.xml
new file mode 100644
index 0000000000..9b0a56a744
--- /dev/null
+++ b/java/distribution/src/main/assembly/bin.xml
@@ -0,0 +1,172 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<assembly>
+ <!-- id typically identifies the "type" (src vs bin etc) of the assembly -->
+ <id>java-bin</id>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <formats>
+ <format>tar.gz</format>
+ <format>zip</format>
+ </formats>
+
+ <fileSets>
+ <fileSet>
+ <directory>src/main/release</directory>
+ <outputDirectory>qpid-${qpid.version}</outputDirectory>
+ <includes>
+ <include>DISCLAIMER</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>..</directory>
+ <outputDirectory>qpid-${qpid.version}</outputDirectory>
+ <includes>
+ <include>*.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/release/etc</directory>
+ <outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
+ <includes>
+ <include>logging.properties</include>
+ <include>log4j.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/release/docs</directory>
+ <outputDirectory>qpid-${qpid.version}/docs</outputDirectory>
+ <includes>
+ <include>RELEASE_NOTES.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory>qpid-${qpid.version}/lib</outputDirectory>
+ <includes>
+ <include>qpid-incubating.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+ <files>
+ <!-- due to a bug in the assembly plugin (MASSEMBLY-153) you have
+ to use decimal numbers to specify fileMode -->
+ <file>
+ <source>../common/etc/qpid-run.conf</source>
+ <outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
+ <destName>qpid-run.conf</destName>
+ <fileMode>420</fileMode>
+ </file>
+ <file>
+ <source>../broker/etc/config.xml</source>
+ <outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
+ <destName>config.xml</destName>
+ <fileMode>420</fileMode>
+ </file>
+ <file>
+ <source>../broker/etc/log4j.xml</source>
+ <outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
+ <destName>log4j.xml</destName>
+ <fileMode>420</fileMode>
+ </file>
+ <file>
+ <source>../broker/etc/passwd</source>
+ <outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
+ <destName>passwd</destName>
+ <fileMode>420</fileMode>
+ </file>
+ <file>
+ <source>../broker/etc/qpid-server.conf</source>
+ <outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
+ <destName>qpid-server.conf</destName>
+ <fileMode>420</fileMode>
+ </file>
+ <file>
+ <source>../broker/etc/virtualhosts.xml</source>
+ <outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
+ <destName>virtualhosts.xml</destName>
+ <fileMode>420</fileMode>
+ </file>
+ <file>
+ <source>../common/bin/qpid-run</source>
+ <outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
+ <destName>qpid-run</destName>
+ <fileMode>493</fileMode>
+ </file>
+ <file>
+ <source>../broker/bin/qpid-server</source>
+ <outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
+ <destName>qpid-server</destName>
+ <fileMode>493</fileMode>
+ </file>
+ <file>
+ <source>../broker/bin/qpid-server.bat</source>
+ <outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
+ <destName>qpid-server.bat</destName>
+ <fileMode>493</fileMode>
+ </file>
+ <file>
+ <source>../broker/bin/run.bat</source>
+ <outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
+ <destName>run.bat</destName>
+ <fileMode>493</fileMode>
+ </file>
+ <file>
+ <source>../broker/bin/run.sh</source>
+ <outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
+ <destName>run.sh</destName>
+ <fileMode>493</fileMode>
+ </file>
+ <file>
+ <source>../broker/bin/runAll</source>
+ <outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
+ <destName>runAll</destName>
+ <fileMode>493</fileMode>
+ </file>
+ </files>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>qpid-${qpid.version}/lib</outputDirectory>
+ <unpack>false</unpack>
+ <excludes>
+ <exclude>org.apache.qpid:qpid-distribution</exclude>
+ <exclude>org.apache.qpid.management:org.apache.qpid.management.ui</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.commands</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.contenttype</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.expressions</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.jobs</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.runtime</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.auth</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</exclude>
+ <exclude>org.eclipse.equinox:org.eclipse.equinox.common</exclude>
+ <exclude>org.eclipse.equinox:org.eclipse.equinox.preferences</exclude>
+ <exclude>org.eclipse.equinox:org.eclipse.equinox.registry</exclude>
+ <exclude>org.eclipse.help:org.eclipse.help</exclude>
+ <exclude>org.eclipse.jface:org.eclipse.jface</exclude>
+ <exclude>org.eclipse.osgi:org.eclipse.osgi</exclude>
+ <exclude>org.eclipse.swt:org.eclipse.swt</exclude>
+ <exclude>org.eclipse.swt:org.eclipse.swt.win32.win32.x86</exclude>
+ <exclude>org.eclipse.ui:org.eclipse.ui</exclude>
+ <exclude>org.eclipse.ui:org.eclipse.ui.forms</exclude>
+ <exclude>org.eclipse.ui:org.eclipse.ui.workbench</exclude>
+ </excludes>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/java/distribution/src/main/assembly/client-bin.xml b/java/distribution/src/main/assembly/client-bin.xml
new file mode 100644
index 0000000000..f89b1a39d2
--- /dev/null
+++ b/java/distribution/src/main/assembly/client-bin.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<assembly>
+ <id>java-client-bin</id>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <formats>
+ <format>tar.gz</format>
+ <format>zip</format>
+ </formats>
+
+ <moduleSets>
+ <moduleSet>
+ <includes>
+ <include>org.apache.qpid:client</include>
+ </includes>
+ <binaries>
+ <includeDependencies>true</includeDependencies>
+ <unpack>false</unpack>
+ </binaries>
+ </moduleSet>
+ </moduleSets>
+
+ <fileSets>
+ <fileSet>
+ <directory>src/main/release</directory>
+ <outputDirectory>qpid-${qpid.version}</outputDirectory>
+ <includes>
+ <include>DISCLAIMER</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>..</directory>
+ <outputDirectory>qpid-${qpid.version}</outputDirectory>
+ <includes>
+ <include>*.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/release/etc</directory>
+ <outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
+ <includes>
+ <include>logging.properties</include>
+ <include>log4j.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/release/docs</directory>
+ <outputDirectory>qpid-${qpid.version}/docs</outputDirectory>
+ <includes>
+ <include>RELEASE_NOTES.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory>qpid-${qpid.version}/lib</outputDirectory>
+ <includes>
+ <include>qpid-incubating.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>qpid-${qpid.version}/lib</outputDirectory>
+ <unpack>false</unpack>
+ <excludes>
+ <exclude>org.apache.qpid:qpid-distribution</exclude>
+ <exclude>org.apache.qpid.management:org.apache.qpid.management.ui</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.commands</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.contenttype</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.expressions</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.jobs</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.runtime</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.auth</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</exclude>
+ <exclude>org.eclipse.equinox:org.eclipse.equinox.common</exclude>
+ <exclude>org.eclipse.equinox:org.eclipse.equinox.preferences</exclude>
+ <exclude>org.eclipse.equinox:org.eclipse.equinox.registry</exclude>
+ <exclude>org.eclipse.help:org.eclipse.help</exclude>
+ <exclude>org.eclipse.jface:org.eclipse.jface</exclude>
+ <exclude>org.eclipse.osgi:org.eclipse.osgi</exclude>
+ <exclude>org.eclipse.swt:org.eclipse.swt</exclude>
+ <exclude>org.eclipse.swt:org.eclipse.swt.win32.win32.x86</exclude>
+ <exclude>org.eclipse.ui:org.eclipse.ui</exclude>
+ <exclude>org.eclipse.ui:org.eclipse.ui.forms</exclude>
+ <exclude>org.eclipse.ui:org.eclipse.ui.workbench</exclude>
+ </excludes>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/java/distribution/src/main/assembly/management-eclipse-plugin-unix.xml b/java/distribution/src/main/assembly/management-eclipse-plugin-unix.xml
new file mode 100644
index 0000000000..5ac131b12b
--- /dev/null
+++ b/java/distribution/src/main/assembly/management-eclipse-plugin-unix.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<assembly>
+ <!-- id typically identifies the "type" (src vs bin etc) of the assembly -->
+ <id>eclipse-plugin-unix</id>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <formats>
+ <format>tar.gz</format>
+ <format>zip</format>
+ </formats>
+<!--
+ <moduleSets>
+ <moduleSet>
+ <includes>
+ <include>org.apache.qpid.management:org.apache.qpid.management.ui</include>
+ </includes>
+ <binaries>
+ <includeDependencies>true</includeDependencies>
+ <unpack>false</unpack>
+ </binaries>
+ </moduleSet>
+ </moduleSets>
+ -->
+ <fileSets>
+ <fileSet>
+ <directory>src/main/release</directory>
+ <outputDirectory>qpidmc</outputDirectory>
+ <includes>
+ <include>DISCLAIMER</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>..</directory>
+ <outputDirectory>qpidmc</outputDirectory>
+ <includes>
+ <include>*.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/release/docs</directory>
+ <outputDirectory>qpidmc/docs</outputDirectory>
+ <includes>
+ <include>RELEASE_NOTES.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../management/eclipse-plugin/src/main/resources/unix/configuration</directory>
+ <outputDirectory>qpidmc/configuration</outputDirectory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../management/eclipse-plugin/src/main/resources</directory>
+ <outputDirectory>qpidmc</outputDirectory>
+ <includes>
+ <include>license.eclipse.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../management/eclipse-plugin/bin</directory>
+ <outputDirectory>qpidmc/bin</outputDirectory>
+ <includes>
+ <include>qpidmc.sh</include>
+ </includes>
+ <fileMode>777</fileMode>
+ </fileSet>
+ </fileSets>
+
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>qpidmc/eclipse/plugins/org.eclipse.core.runtime.compatibility.registry_3.2.0</outputDirectory>
+ <outputFileNameMapping>${artifactId}_${version}/</outputFileNameMapping>
+ <unpack>true</unpack>
+ <includes>
+ <include>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</include>
+ </includes>
+ <scope>runtime</scope>
+ </dependencySet>
+
+ <dependencySet>
+ <outputDirectory>qpidmc/eclipse/plugins</outputDirectory>
+ <outputFileNameMapping>${artifactId}_${version}.${extension}</outputFileNameMapping>
+ <unpack>false</unpack>
+ <excludes>
+ <exclude>org.apache.qpid:qpid-distribution</exclude>
+ </excludes>
+ <includes>
+ <include>org.eclipse.ui:org.eclipse.ui.forms</include>
+ <include>org.apache.qpid.management:org.apache.qpid.management.ui</include>
+ </includes>
+ <scope>runtime</scope>
+ </dependencySet>
+</dependencySets>
+</assembly>
diff --git a/java/distribution/src/main/assembly/management-eclipse-plugin.xml b/java/distribution/src/main/assembly/management-eclipse-plugin.xml
new file mode 100644
index 0000000000..f6c2399785
--- /dev/null
+++ b/java/distribution/src/main/assembly/management-eclipse-plugin.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<assembly>
+ <!-- id typically identifies the "type" (src vs bin etc) of the assembly -->
+ <id>management-console-win32</id>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <formats>
+ <format>zip</format>
+ </formats>
+<!--
+ <moduleSets>
+ <moduleSet>
+ <includes>
+ <include>org.apache.qpid.management:org.apache.qpid.management.ui</include>
+ </includes>
+ <binaries>
+ <includeDependencies>true</includeDependencies>
+ <unpack>false</unpack>
+ </binaries>
+ </moduleSet>
+ </moduleSets>
+ -->
+ <fileSets>
+ <fileSet>
+ <directory>src/main/release</directory>
+ <outputDirectory>qpidmc</outputDirectory>
+ <includes>
+ <include>DISCLAIMER</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>..</directory>
+ <outputDirectory>qpidmc</outputDirectory>
+ <includes>
+ <include>*.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/release/docs</directory>
+ <outputDirectory>qpidmc/docs</outputDirectory>
+ <includes>
+ <include>RELEASE_NOTES.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../management/eclipse-plugin/src/main/resources/win32/configuration</directory>
+ <outputDirectory>qpidmc/configuration</outputDirectory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../management/eclipse-plugin/src/main/resources</directory>
+ <outputDirectory>qpidmc/eclipse</outputDirectory>
+ <includes>
+ <include>*.*</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../management/eclipse-plugin/bin</directory>
+ <outputDirectory>qpidmc/bin</outputDirectory>
+ <includes>
+ <include>**</include>
+ </includes>
+ <fileMode>777</fileMode>
+ </fileSet>
+ </fileSets>
+
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>qpidmc/eclipse/plugins</outputDirectory>
+ <outputFileNameMapping>${artifactId}_${version}.${extension}</outputFileNameMapping>
+ <unpack>false</unpack>
+ <excludes>
+ <exclude>org.apache.qpid:qpid-distribution</exclude>
+ <exclude>org.apache.qpid:qpid-common</exclude>
+ <exclude>org.apache.qpid:qpid-broker</exclude>
+ <exclude>org.apache.qpid:qpid-client</exclude>
+ <exclude>commons-cli:commons-cli</exclude>
+ <exclude>commons-configuration:commons-configuration</exclude>
+ <exclude>commons-lang:commons-lang</exclude>
+ <exclude>org.apache.mina:mina-filter-ssl</exclude>
+ <exclude>org.apache.mina:mina-java5</exclude>
+ <exclude>backport-util-concurrent:backport-util-concurrent</exclude>
+ <exclude>org.slf4j:slf4j-simple</exclude>
+ <exclude>junit:junit</exclude>
+ <exclude>org.easymock:easymockclassextension</exclude>
+ <exclude>commons-codec:commons-codec</exclude>
+ <exclude>org.apache.geronimo.specs:geronimo-jms_1.1_spec</exclude>
+ <exclude>commons-collections:commons-collections</exclude>
+ <exclude>commons-lang:commons-lang</exclude>
+ <exclude>org.apache.mina:mina-core</exclude>
+ <exclude>commons-beanutils:commons-beanutils</exclude>
+ <exclude>commons-beanutils:commons-beanutils-core</exclude>
+ <exclude>commons-digester:commons-digester</exclude>
+ <exclude>commons-logging:commons-logging</exclude>
+ <exclude>commons-logging:commons-logging-api</exclude>
+ <exclude>dom4j:dom4j</exclude>
+ <exclude>isorelax:isorelax</exclude>
+ <exclude>jaxen:jaxen</exclude>
+ <exclude>log4j:log4j</exclude>
+ <exclude>msv:msv</exclude>
+ <exclude>xalan:xalan</exclude>
+ <exclude>xml-apis:xml-apis</exclude>
+ <exclude>saxpath:saxpath</exclude>
+ <exclude>servletapi:servletapi</exclude>
+ <exclude>relaxngDatatype:relaxngDatatype</exclude>
+ <exclude>xerces:xercesImpl</exclude>
+ <exclude>javax.servlet:servlet-api</exclude>
+ <exclude>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</exclude>
+ </excludes>
+ <scope>runtime</scope>
+ </dependencySet>
+ <dependencySet>
+ <outputDirectory>qpidmc/eclipse/plugins/org.eclipse.core.runtime.compatibility.registry_3.2.0</outputDirectory>
+ <outputFileNameMapping>${artifactId}_${version}/</outputFileNameMapping>
+ <unpack>true</unpack>
+ <includes>
+ <include>org.eclipse.core:org.eclipse.core.runtime.compatibility.registry</include>
+ </includes>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+
+</assembly>
diff --git a/java/distribution/src/main/assembly/src.xml b/java/distribution/src/main/assembly/src.xml
new file mode 100644
index 0000000000..b66425c3d2
--- /dev/null
+++ b/java/distribution/src/main/assembly/src.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<assembly>
+ <!-- id typically identifies the "type" (src vs bin etc) of the assembly -->
+ <id>java-src</id>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <formats>
+ <format>tar.gz</format>
+ <format>zip</format>
+ </formats>
+
+ <fileSets>
+ <fileSet>
+ <directory>src/main/release</directory>
+ <outputDirectory>qpid-${qpid.version}-src</outputDirectory>
+ <includes>
+ <include>DISCLAIMER</include>
+ <include>LICENSE</include>
+ <include>licenses/*.*</include>
+ <include>NOTICE</include>
+ <include>README</include>
+ <include>BUILDING.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>..</directory>
+ <outputDirectory>qpid-${qpid.version}-src</outputDirectory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ <excludes>
+ <exclude>build.xml</exclude>
+ <exclude>distribution/build.xml</exclude>
+ <exclude>benchmark</exclude>
+ <exclude>benchmark/**/*</exclude>
+ <exclude>**/target</exclude>
+ <exclude>**/target/**/*</exclude>
+ <exclude>**/build</exclude>
+ <exclude>**/build/**/*</exclude>
+ <exclude>**/.settings</exclude>
+ <exclude>**/.classpath</exclude>
+ <exclude>**/.project</exclude>
+ <exclude>**/.wtpmodules</exclude>
+ <exclude>**/surefire*</exclude>
+ <exclude>**/cobertura.ser</exclude>
+ <exclude>bin</exclude>
+ <exclude>bin/*</exclude>
+ <exclude>lib</exclude>
+ <exclude>lib/**/*</exclude>
+ <exclude>**/var/journal</exclude>
+ <exclude>**/build.out*</exclude>
+ <exclude>**/eclipse-plugin/bin/**</exclude>
+ <exclude>**/eclipse-plugin/plugins/**</exclude>
+ <exclude>**/eclipse-plugin/src/main/resources/**</exclude>
+ </excludes>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/java/distribution/src/main/release/DISCLAIMER b/java/distribution/src/main/release/DISCLAIMER
new file mode 100644
index 0000000000..c9a0ddf8f9
--- /dev/null
+++ b/java/distribution/src/main/release/DISCLAIMER
@@ -0,0 +1,5 @@
+Apache Qpid is an effort undergoing incubation at the Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC.
+
+Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects.
+
+While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.
diff --git a/java/pom.xml b/java/pom.xml
index dd5280cfde..b45c14b91b 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -87,6 +87,7 @@
<eclipse.plugin.version>2.2</eclipse.plugin.version>
<jar.version>2.0</jar.version>
<javadoc.version>2.0</javadoc.version>
+ <junit.version>3.8.1</junit.version>
<jxr.version>2.0</jxr.version>
<mprojectinfo.version>2.0</mprojectinfo.version>
<resources.version>2.2</resources.version>
@@ -94,9 +95,10 @@
<surefire-report.version>2.1-SNAPSHOT</surefire-report.version>
<surefire.version>2.2</surefire.version>
- <amqj.logging.level>debug</amqj.logging.level>
+ <amqj.logging.level>warn</amqj.logging.level>
<eclipse.workspace.dir>${basedir}/${topDirectoryLocation}/../workspace</eclipse.workspace.dir>
+ <clover.license.pathname>/set/clover/license/path/here</clover.license.pathname>
</properties>
<modules>
@@ -160,6 +162,7 @@
<pluginManagement>
<plugins>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
@@ -172,6 +175,52 @@
</dependency>
</dependencies>
+
+ <executions>
+
+ <!-- This Ant task writes the module name, version and the Subversion version information out to a properties file.
+ The svnversion command must be available to run from the command line for this to work. The build will not fail if
+ svnversion cannot be run though.
+ This is done during the 'compile' phase to reflect the version of the currently compiled code and to ensure that
+ these properties are up to date when running from a file system classpath. Consider moving this to, or running a second
+ time, during the 'package' phase to capture the version of any resources added to jar files.
+ This svnversion command is always run in the top directory to accurately reflect the svnversion range accross all modules
+ at the time of the build.
+ The properties are placed into a file 'version.properties' in the target/classes directory of any child module that runs
+ this plugin.
+ The 'version.properties' file is loaded by the org.apache.qpid.common.QpidProperties class.
+ Be carefull of the possibility that the 'common' module may run this antrun plugin and recieve its own set of
+ version.properties and then the client or broker being built against an older version of the common library ending up with
+ the wrong version information. This is unlikely to happen because the client or broker should pick up its own properties
+ from the classpath first. If this happens it will be obvious because the productName property will be
+ 'Qpid Common Utilities'. If this is a problem then push this ant task down into the client and broker poms and remove it
+ from here.
+ -->
+ <execution>
+ <id>version_properties</id>
+ <phase>compile</phase>
+ <configuration>
+ <tasks>
+
+ <exec executable="svnversion" spawn="false" failifexecutionfails="false"
+ dir="${topDirectoryLocation}" outputproperty="svnversion">
+ <arg line="."/>
+ </exec>
+
+ <!-- Write the version.properties out. -->
+ <propertyfile file="target/classes/qpidversion.properties">
+ <entry key="qpid.svnversion" value="${svnversion}"/>
+ <entry key="qpid.name" value="${project.name}"/>
+ <entry key="qpid.version" value="${project.version}"/>
+ </propertyfile>
+
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -241,9 +290,25 @@
<downloadJavadocs>true</downloadJavadocs>
</configuration>
</plugin>
-
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clover-plugin</artifactId>
+ <version>2.3</version>
+ <configuration>
+ <licenseLocation>${clover.license.pathname}</licenseLocation>
+ <jdk>${java.source.version}</jdk>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>instrument</goal>
+ <goal>aggregate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
-
</pluginManagement>
<defaultGoal>install</defaultGoal>
</build>
@@ -335,7 +400,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>3.8.1</version>
+ <version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -399,6 +464,10 @@
<artifactId>maven-javadoc-plugin</artifactId>
<version>${javadoc.version}</version>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clover-plugin</artifactId>
+ </plugin>
</plugins>
</reporting>
diff --git a/java/systests/pom.xml b/java/systests/pom.xml
index 93c8a2333b..c73e5f2c44 100644
--- a/java/systests/pom.xml
+++ b/java/systests/pom.xml
@@ -34,7 +34,6 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
- <amqj.logging.level>warn</amqj.logging.level>
</properties>
<dependencies>
diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index a0765f6924..258bcecc41 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -95,7 +95,8 @@ public class TxAckTest extends TestCase
Scenario(int messageCount, List<Long> acked, List<Long> unacked)
{
TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), null,
- new LinkedList<RequiredDeliveryException>());
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
for(int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 991a098678..5909ac048b 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -202,7 +202,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private static MessageStore _messageStore = new SkeletonMessageStore();
private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null,
- new LinkedList<RequiredDeliveryException>());
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
Message(String id, String... headers) throws AMQException
{
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
new file mode 100644
index 0000000000..c8a87a0a0e
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.util.ArrayList;
+
+/**
+ * Unit test class for testing different Exchange MBean operations
+ */
+public class ExchangeMBeanTest extends TestCase
+{
+ private AMQQueue _queue;
+ private QueueRegistry _queueRegistry;
+
+ /**
+ * Test for direct exchange mbean
+ * @throws Exception
+ */
+ public void testDirectExchangeMBean() throws Exception
+ {
+ DestNameExchange exchange = new DestNameExchange();
+ exchange.initialise("amq.direct", false, 0, true);
+ ManagedObject managedObj = exchange.getManagedObject();
+ ManagedExchange mbean = (ManagedExchange)managedObj;
+
+ mbean.createNewBinding(_queue.getName(), "binding1");
+ mbean.createNewBinding(_queue.getName(), "binding2");
+
+ TabularData data = mbean.bindings();
+ ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
+ assertTrue(list.size() == 2);
+
+ // test general exchange properties
+ assertEquals(mbean.getName(), "amq.direct");
+ assertEquals(mbean.getExchangeType(), "direct");
+ assertTrue(mbean.getTicketNo() == 0);
+ assertTrue(!mbean.isDurable());
+ assertTrue(mbean.isAutoDelete());
+ }
+
+ /**
+ * Test for "topic" exchange mbean
+ * @throws Exception
+ */
+ public void testTopicExchangeMBean() throws Exception
+ {
+ DestWildExchange exchange = new DestWildExchange();
+ exchange.initialise("amq.topic", false, 0, true);
+ ManagedObject managedObj = exchange.getManagedObject();
+ ManagedExchange mbean = (ManagedExchange)managedObj;
+
+ mbean.createNewBinding(_queue.getName(), "binding1");
+ mbean.createNewBinding(_queue.getName(), "binding2");
+
+ TabularData data = mbean.bindings();
+ ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
+ assertTrue(list.size() == 2);
+
+ // test general exchange properties
+ assertEquals(mbean.getName(), "amq.topic");
+ assertEquals(mbean.getExchangeType(), "topic");
+ assertTrue(mbean.getTicketNo() == 0);
+ assertTrue(!mbean.isDurable());
+ assertTrue(mbean.isAutoDelete());
+ }
+
+ /**
+ * Test for "Headers" exchange mbean
+ * @throws Exception
+ */
+ public void testHeadersExchangeMBean() throws Exception
+ {
+ HeadersExchange exchange = new HeadersExchange();
+ exchange.initialise("amq.headers", false, 0, true);
+ ManagedObject managedObj = exchange.getManagedObject();
+ ManagedExchange mbean = (ManagedExchange)managedObj;
+
+ mbean.createNewBinding(_queue.getName(), "key1=binding1,key2=binding2");
+ mbean.createNewBinding(_queue.getName(), "key3=binding3");
+
+ TabularData data = mbean.bindings();
+ ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
+ assertTrue(list.size() == 2);
+
+ // test general exchange properties
+ assertEquals(mbean.getName(), "amq.headers");
+ assertEquals(mbean.getExchangeType(), "headers");
+ assertTrue(mbean.getTicketNo() == 0);
+ assertTrue(!mbean.isDurable());
+ assertTrue(mbean.isAutoDelete());
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
+ _queue = new AMQQueue("testQueue", false, "ExchangeMBeanTest", false, _queueRegistry);
+ _queueRegistry.registerQueue(_queue);
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index ec6a82cc29..b125bc1d4c 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -6,6 +6,7 @@ import org.apache.qpid.test.VMBrokerSetup;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.client.*;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -38,12 +39,14 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
protected void setUp() throws Exception
{
super.setUp();
+ TransportConnection.createVMBroker(1);
ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
}
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killAllVMBrokers();
}
/**
@@ -56,13 +59,14 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
_bouncedMessageList.clear();
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+ AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
FieldTable ft = new PropertyFieldTable();
- ft.setString("F1000","1");
- MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
-
+ ft.setString("F1000", "1");
+ MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
@@ -76,49 +80,45 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
con2.start();
- MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false);
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(queue, false, false);
MessageProducer mandatoryProducer = producerSession.createProducer(queue);
-
// First test - should neither be bounced nor routed
_logger.info("Sending non-routable non-mandatory message");
- TextMessage msg1 = producerSession.createTextMessage("msg1");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
nonMandatoryProducer.send(msg1);
// Second test - should be bounced
_logger.info("Sending non-routable mandatory message");
- TextMessage msg2 = producerSession.createTextMessage("msg2");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
mandatoryProducer.send(msg2);
// Third test - should be routed
_logger.info("Sending routable message");
- TextMessage msg3 = producerSession.createTextMessage("msg3");
- msg3.setStringProperty("F1000","1");
+ TextMessage msg3 = producerSession.createTextMessage("msg3");
+ msg3.setStringProperty("F1000", "1");
mandatoryProducer.send(msg3);
-
_logger.info("Starting consumer connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive(1000L);
- assertTrue("No message routed to receiver",tm != null);
- assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText()));
+ assertTrue("No message routed to receiver", tm != null);
+ assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg3".equals(tm.getText()));
try
{
Thread.sleep(1000L);
}
- catch(InterruptedException e)
+ catch (InterruptedException e)
{
;
}
- assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1);
+ assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
Message m = _bouncedMessageList.get(0);
- assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2"));
-
-
+ assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
con.close();
@@ -129,18 +129,23 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class));
+ return new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class);
}
public void onException(JMSException jmsException)
{
- _logger.warn("Caught exception on producer: ",jmsException);
+
Exception linkedException = jmsException.getLinkedException();
- if(linkedException instanceof AMQNoRouteException)
+ if (linkedException instanceof AMQNoRouteException)
{
AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
Message bounced = (Message) noRoute.getUndeliveredMessage();
_bouncedMessageList.add(bounced);
+ _logger.info("Caught expected NoRouteException");
+ }
+ else
+ {
+ _logger.warn("Caught exception on producer: ", jmsException);
}
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
new file mode 100644
index 0000000000..c2ac099855
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.TestCase;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.AMQException;
+
+import javax.management.JMException;
+
+/**
+ * Test class to test MBean operations for AMQMinaProtocolSession.
+ */
+public class AMQProtocolSessionMBeanTest extends TestCase
+{
+ private IoSession _mockIOSession;
+ private MessageStore _messageStore = new SkeletonMessageStore();
+ private AMQMinaProtocolSession _protocolSession;
+ private AMQChannel _channel;
+ private QueueRegistry _queueRegistry;
+ private ExchangeRegistry _exchangeRegistry;
+ private AMQProtocolSessionMBean _mbean;
+
+ public void testChannels() throws Exception
+ {
+ // check the channel count is correct
+ int channelCount = _mbean.channels().size();
+ assertTrue(channelCount == 1);
+ _protocolSession.addChannel(new AMQChannel(2, _messageStore, null));
+ channelCount = _mbean.channels().size();
+ assertTrue(channelCount == 2);
+
+ // general properties test
+ _mbean.setMaximumNumberOfChannels(1000L);
+ assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
+
+ // check APIs
+ AMQChannel channel3 = new AMQChannel(3, _messageStore, null);
+ channel3.setLocalTransactional();
+ _protocolSession.addChannel(channel3);
+ _mbean.rollbackTransactions(2);
+ _mbean.rollbackTransactions(3);
+ _mbean.commitTransactions(2);
+ _mbean.commitTransactions(3);
+
+ // This should throw exception, because the channel does't exist
+ try
+ {
+ _mbean.commitTransactions(4);
+ fail();
+ }
+ catch (JMException ex)
+ {
+ System.out.println("expected exception is thrown :" + ex.getMessage());
+ }
+
+ // check if closing of session works
+ _protocolSession.addChannel(new AMQChannel(5, _messageStore, null));
+ _mbean.closeConnection();
+ try
+ {
+ channelCount = _mbean.channels().size();
+ assertTrue(channelCount == 0);
+ // session is now closed so adding another channel should throw an exception
+ _protocolSession.addChannel(new AMQChannel(6, _messageStore, null));
+ fail();
+ }
+ catch(AMQException ex)
+ {
+ System.out.println("expected exception is thrown :" + ex.getMessage());
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _channel = new AMQChannel(1, _messageStore, null);
+ _queueRegistry = new DefaultQueueRegistry();
+ _exchangeRegistry = new DefaultExchangeRegistry(new DefaultExchangeFactory());
+ _mockIOSession = new MockIoSession();
+ _protocolSession = new AMQMinaProtocolSession(_mockIOSession, _queueRegistry, _exchangeRegistry, new AMQCodecFactory(true));
+ _protocolSession.addChannel(_channel);
+ _mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject();
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
index 81dea32a76..cf6366b513 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
@@ -25,6 +25,7 @@ import org.apache.mina.common.support.DefaultCloseFuture;
import org.apache.mina.common.support.DefaultWriteFuture;
import java.net.SocketAddress;
+import java.net.InetSocketAddress;
import java.util.Set;
public class MockIoSession implements IoSession
@@ -151,7 +152,7 @@ public class MockIoSession implements IoSession
public SocketAddress getRemoteAddress()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates.
}
public SocketAddress getLocalAddress()
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 4fcc691a2f..562452d729 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -30,18 +30,20 @@ import org.apache.qpid.server.store.SkeletonMessageStore;
import javax.management.JMException;
import java.util.LinkedList;
+import java.util.HashSet;
/**
* Test class to test AMQQueueMBean attribtues and operations
*/
-public class AMQQueueMBeanTest extends TestCase
+public class AMQQueueMBeanTest extends TestCase
{
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private QueueRegistry _queueRegistry;
private MessageStore _messageStore = new SkeletonMessageStore();
private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, null,
- new LinkedList<RequiredDeliveryException>());
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
private MockProtocolSession _protocolSession;
private AMQChannel _channel;
@@ -68,14 +70,14 @@ public class AMQQueueMBeanTest extends TestCase
assertFalse(mgr.hasActiveSubscribers());
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- _channel = new AMQChannel(1, _messageStore, null);
+ _channel = new AMQChannel(1, _messageStore, null);
_protocolSession = new MockProtocolSession(_messageStore);
_protocolSession.addChannel(_channel);
- _queue.registerProtocolSession(_protocolSession, 1, "test", false);
+ _queue.registerProtocolSession(_protocolSession, 1, "test", false, null);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
- SubscriptionSet _subscribers = (SubscriptionSet)mgr;
+ SubscriptionSet _subscribers = (SubscriptionSet) mgr;
SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1");
SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2");
_subscribers.addSubscriber(s1);
@@ -167,7 +169,7 @@ public class AMQQueueMBeanTest extends TestCase
super.setUp();
_queueRegistry = new DefaultQueueRegistry();
_queue = new AMQQueue("testQueue", false, "AMQueueMBeanTest", false, _queueRegistry);
- _queueMBean = new AMQQueueMBean(_queue);
+ _queueMBean = new AMQQueueMBean(_queue);
}
private void sendMessages(int messageCount) throws AMQException
@@ -175,7 +177,8 @@ public class AMQQueueMBeanTest extends TestCase
AMQMessage[] messages = new AMQMessage[messageCount];
for (int i = 0; i < messages.length; i++)
{
- messages[i] = message(false);;
+ messages[i] = message(false);
+ ;
}
for (int i = 0; i < messageCount; i++)
{
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 1911d38cd2..d4ea728e95 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.util.TestApplicationRegistry;
import java.util.LinkedList;
import java.util.Set;
+import java.util.HashSet;
/**
* Tests that acknowledgements are handled correctly.
@@ -82,7 +83,8 @@ public class AckTest extends TestCase
private void publishMessages(int count, boolean persistent) throws AMQException
{
TransactionalContext txnContext = new NonTransactionalContext(_messageStore, null,
- new LinkedList<RequiredDeliveryException>());
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 1; i <= count; i++)
{
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
index 8cf84e0dcf..7843d8a182 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -34,13 +34,15 @@ import org.apache.qpid.AMQException;
import junit.framework.TestCase;
import java.util.LinkedList;
+import java.util.HashSet;
class MessageTestHelper extends TestCase
{
private final MessageStore _messageStore = new SkeletonMessageStore();
private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null,
- new LinkedList<RequiredDeliveryException>());
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
MessageTestHelper() throws Exception
{
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index bb8fd5bc19..87e5c43932 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
@@ -121,4 +122,13 @@ public class MockProtocolSession implements AMQProtocolSession
public void setSaslServer(SaslServer saslServer)
{
}
+
+ public FieldTable getClientProperties()
+ {
+ return null;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
+ {
+ }
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 2773c810d2..fea3c93280 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
public class SubscriptionTestHelper implements Subscription
{
@@ -70,6 +71,41 @@ public class SubscriptionTestHelper implements Subscription
{
}
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op
+ }
+
+ public boolean isAutoClose()
+ {
+ return false;
+ }
+
+ public void close()
+ {
+ //no-op
+ }
+
+ public boolean isBrowser()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public int hashCode()
{
return key.hashCode();
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 97c9becf18..a40a9bf12f 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -49,7 +49,7 @@ public class TestReferenceCounting extends TestCase
{
createPersistentContentHeader();
AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(),
- new NonTransactionalContext(_store, null, null),
+ new NonTransactionalContext(_store, null, null, null),
createPersistentContentHeader());
message.incrementReference();
// we call routing complete to set up the handle
@@ -71,7 +71,7 @@ public class TestReferenceCounting extends TestCase
public void testMessageRemains() throws AMQException
{
AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(),
- new NonTransactionalContext(_store, null, null),
+ new NonTransactionalContext(_store, null, null, null),
createPersistentContentHeader());
message.incrementReference();
// we call routing complete to set up the handle