summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-03-01 10:20:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-03-01 10:20:36 +0000
commite1126b978d046384380eeaa5a9b309618ddc7df5 (patch)
tree99cc5a14de6ee40ee72f75fecb6efbefc7c90805
parented972141474d38a2c07818e0a455d897d3c8976e (diff)
downloadqpid-python-e1126b978d046384380eeaa5a9b309618ddc7df5.tar.gz
NO-JIRA: [AMQP 1-0 Sandbox] merging from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1295495 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/java/bdbstore/bin/backup.sh2
-rwxr-xr-xqpid/java/bdbstore/bin/storeUpgrade.sh2
-rw-r--r--qpid/java/bdbstore/build.xml8
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java)0
-rw-r--r--qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java4
-rw-r--r--qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java5
-rwxr-xr-xqpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd2
-rw-r--r--qpid/java/broker/bin/qpid-server.bat4
-rw-r--r--qpid/java/broker/build.xml3
-rw-r--r--qpid/java/broker/etc/config.xml4
-rw-r--r--qpid/java/broker/etc/virtualhosts.xml15
-rwxr-xr-x[-rw-r--r--]qpid/java/broker/src/main/java/broker.bnd (renamed from qpid/java/build.overrides)10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java99
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java84
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java68
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java10
-rw-r--r--qpid/java/build.deps2
-rw-r--r--qpid/java/build.xml2
-rwxr-xr-xqpid/java/client/src/main/java/client.bnd2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java23
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java85
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java18
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java70
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java179
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java63
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java265
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java22
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java17
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java108
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java7
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java9
-rw-r--r--qpid/java/common.xml2
-rwxr-xr-xqpid/java/common/src/main/java/common.bnd2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java43
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java17
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java86
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java15
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java54
-rw-r--r--qpid/java/ivy.xml20
-rw-r--r--qpid/java/lib/poms/je-4.0.117.xml22
-rw-r--r--qpid/java/management/common/build.xml3
-rw-r--r--qpid/java/management/common/src/main/java/management-common.bnd2
-rw-r--r--qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF2
-rw-r--r--qpid/java/systests/etc/config-systests-firewall-2.xml4
-rw-r--r--qpid/java/systests/etc/config-systests-firewall-3.xml6
-rw-r--r--qpid/java/systests/etc/config-systests-settings.xml4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java12
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java14
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java276
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java116
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java13
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java179
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java14
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java67
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes3
-rw-r--r--qpid/java/test-profiles/JavaPre010Excludes1
-rw-r--r--qpid/java/test-profiles/python_tests/Java010PythonExcludes133
90 files changed, 1917 insertions, 746 deletions
diff --git a/qpid/java/bdbstore/bin/backup.sh b/qpid/java/bdbstore/bin/backup.sh
index 0e2f0fda09..22d8d52b58 100755
--- a/qpid/java/bdbstore/bin/backup.sh
+++ b/qpid/java/bdbstore/bin/backup.sh
@@ -31,7 +31,7 @@ WHEREAMI=`dirname $0`
if [ -z "$QPID_HOME" ]; then
export QPID_HOME=`cd $WHEREAMI/../ && pwd`
fi
-VERSION=0.13
+VERSION=0.15
LIBS=$QPID_HOME/lib/je-4.0.103.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar
diff --git a/qpid/java/bdbstore/bin/storeUpgrade.sh b/qpid/java/bdbstore/bin/storeUpgrade.sh
index 076b9d3f7e..f3b089b3d3 100755
--- a/qpid/java/bdbstore/bin/storeUpgrade.sh
+++ b/qpid/java/bdbstore/bin/storeUpgrade.sh
@@ -36,7 +36,7 @@ if [ -z "$BDB_HOME" ]; then
export BDB_HOME=$(dirname $(dirname $(readlink -f $0)))
fi
-VERSION=0.13
+VERSION=0.15
LIBS=$BDB_HOME/lib/je-4.0.103.jar:$BDB_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar
diff --git a/qpid/java/bdbstore/build.xml b/qpid/java/bdbstore/build.xml
index 9355358e6c..9513e7cc5b 100644
--- a/qpid/java/bdbstore/build.xml
+++ b/qpid/java/bdbstore/build.xml
@@ -17,13 +17,14 @@
- under the License.
-->
<project name="bdbstore" default="build">
- <property name="module.depends" value="common client management/common broker perftests systests" />
- <property name="module.test.depends" value="test common/test broker/test management/common perftests systests" />
+ <property name="module.depends" value="common broker" />
+ <property name="module.test.depends" value="test client common/test broker/test management/common perftests systests" />
+ <property name="module.genpom" value="true"/>
<import file="../module.xml" />
<property name="bdb.lib.dir" value="${project.root}/lib/bdbstore" />
- <property name="bdb.version" value="4.0.103" />
+ <property name="bdb.version" value="4.0.117" />
<property name="bdb.download.url" value="http://download.oracle.com/maven/com/sleepycat/je/${bdb.version}/je-${bdb.version}.jar" />
<property name="bdb.jar.file" value="${bdb.lib.dir}/je-${bdb.version}.jar" />
@@ -80,5 +81,4 @@ http://www.oracle.com/technetwork/database/berkeleydb/downloads/jeoslicense-0868
<fileset dir="src/test/resources/upgrade"/>
</copy>
</target>
-
</project>
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java
index f6344b3d7d..f6344b3d7d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java
diff --git a/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java b/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java
index bf3ef61ea1..c8e9805cd9 100644
--- a/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java
+++ b/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java
@@ -74,9 +74,9 @@ public class AppInfo
appInfoMap.put("port", sc.getPorts().toString());
appInfoMap.put("version", QpidProperties.getReleaseVersion());
appInfoMap.put("vhosts", "standalone");
- appInfoMap.put("KeystorePath", sc.getKeystorePath());
+ appInfoMap.put("KeystorePath", sc.getConnectorKeyStorePath());
appInfoMap.put("PluginDirectory", sc.getPluginDirectory());
- appInfoMap.put("CertType", sc.getCertType());
+ appInfoMap.put("CertType", sc.getConnectorCertType());
appInfoMap.put("QpidWork", sc.getQpidWork());
appInfoMap.put("Bind", sc.getBind());
}
diff --git a/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java b/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java
index 156c9eb138..348e860d5f 100644
--- a/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java
+++ b/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java
@@ -210,12 +210,13 @@ public class InfoPluginTest extends QpidBrokerTestCase
}
br.close();
System.out.println("*** Received buffer: " + buf);
- System.out.println("*** Latch countdown");
- _latch.countDown();
synchronized (_recv)
{
_recv.add(buf);
}
+
+ System.out.println("*** Latch countdown");
+ _latch.countDown();
}
catch (Exception ex)
{
diff --git a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd
index f6ae79536d..c9e1371732 100755
--- a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd
+++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.13.0
+ver: 0.15.0
Bundle-SymbolicName: qpid-shutdown-plugin
Bundle-Version: ${ver}
diff --git a/qpid/java/broker/bin/qpid-server.bat b/qpid/java/broker/bin/qpid-server.bat
index af543decb3..6b7bbcb96e 100644
--- a/qpid/java/broker/bin/qpid-server.bat
+++ b/qpid/java/broker/bin/qpid-server.bat
@@ -65,7 +65,7 @@ if "%AMQJ_LOGGING_LEVEL%" == "" set AMQJ_LOGGING_LEVEL=info
REM Set the default system properties that we'll use now that they have
REM all been initialised
-set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% "-DQPID_HOME=%QPID_HOME%" "-DQPID_WORK=%QPID_WORK%"
+set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% -DQPID_HOME="%QPID_HOME%" -DQPID_WORK="%QPID_WORK%"
if "%EXTERNAL_CLASSPATH%" == "" set EXTERNAL_CLASSPATH=%CLASSPATH%
@@ -77,7 +77,7 @@ goto afterQpidClasspath
:noQpidClasspath
echo Warning: Qpid classpath not set. CLASSPATH set to %QPID_HOME%\lib\qpid-all.jar
-set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar
+set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar;%QPID_HOME%\lib\opt\*
:afterQpidClasspath
REM start parsing -run arguments
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index b15a7fd02e..6ea2b9a63e 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -19,10 +19,10 @@
-
-->
<project name="AMQ Broker" default="build">
-
<property name="module.depends" value="management/common common amqp-1-0-common"/>
<property name="module.test.depends" value="common/test" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
+ <property name="module.genpom" value="true"/>
<import file="../module.xml"/>
@@ -84,4 +84,5 @@
<target name="release-bin" depends="release-bin-tasks"/>
+ <target name="bundle" depends="bundle-tasks"/>
</project>
diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml
index 2752274155..d18e1392e6 100644
--- a/qpid/java/broker/etc/config.xml
+++ b/qpid/java/broker/etc/config.xml
@@ -35,8 +35,8 @@
<enabled>false</enabled>
<port>5671</port>
<sslOnly>false</sslOnly>
- <keystorePath>/path/to/keystore.ks</keystorePath>
- <keystorePassword>keystorepass</keystorePassword>
+ <keyStorePath>/path/to/keystore.ks</keyStorePath>
+ <keyStorePassword>keystorepass</keyStorePassword>
</ssl>
<port>5672</port>
<socketReceiveBuffer>262144</socketReceiveBuffer>
diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml
index 33a48a1349..4dcdcda6d2 100644
--- a/qpid/java/broker/etc/virtualhosts.xml
+++ b/qpid/java/broker/etc/virtualhosts.xml
@@ -25,8 +25,9 @@
<name>localhost</name>
<localhost>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore
- </class>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
<housekeeping>
@@ -85,8 +86,9 @@
<name>development</name>
<development>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore
- </class>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
<queues>
@@ -123,8 +125,9 @@
<name>test</name>
<test>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore
- </class>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
<queues>
diff --git a/qpid/java/build.overrides b/qpid/java/broker/src/main/java/broker.bnd
index dbe05b4ec0..25b0495a63 100644..100755
--- a/qpid/java/build.overrides
+++ b/qpid/java/broker/src/main/java/broker.bnd
@@ -17,8 +17,10 @@
# under the License.
#
-# File to allow overriding default values of properties within the build system
-# without having to specify them on the command line on every build execution
+ver: 0.15.0
+
+Bundle-SymbolicName: qpid-broker
+Bundle-Version: ${ver}
+Export-Package: *;version=${ver}
+Bundle-RequiredExecutionEnvironment: J2SE-1.5
-# Override the setting for the optional modules to be built
-#modules.opt=bdbstore
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 9c77032d4c..6abef6fd6b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -694,7 +694,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory,
final String srcQueue,
final String destQueue,
- final Long qty)
+ final Long qty,
+ final Map filter) // TODO: move based on group identifier
{
// TODO
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
@@ -712,6 +713,19 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory,
+ final java.lang.Boolean receive)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
final String type,
final String name,
@@ -731,6 +745,14 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.QueryMethodResponseCommand query(final BrokerSchema.BrokerClass.QueryMethodResponseCommandFactory factory,
+ final String type,
+ final String name)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public UUID getId()
{
return _obj.getId();
@@ -1102,7 +1124,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
}
public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory,
- final Long request)
+ final Long request,
+ final Map filter) // TODO: support for purge-by-group-identifier
{
try
{
@@ -1118,7 +1141,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
public BrokerSchema.QueueClass.RerouteMethodResponseCommand reroute(final BrokerSchema.QueueClass.RerouteMethodResponseCommandFactory factory,
final Long request,
final Boolean useAltExchange,
- final String exchange)
+ final String exchange,
+ final Map filter) // TODO: support for re-route-by-group-identifier
{
//TODO
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index c337c4db75..5c1814590c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -218,9 +218,9 @@ public class Broker
if (serverConfig.getEnableSSL())
{
- final String keystorePath = serverConfig.getKeystorePath();
- final String keystorePassword = serverConfig.getKeystorePassword();
- final String certType = serverConfig.getCertType();
+ final String keystorePath = serverConfig.getConnectorKeyStorePath();
+ final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
+ final String certType = serverConfig.getConnectorCertType();
final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType);
for(int sslPort : sslPorts)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
index 5cdb886821..7dffc2d3c0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.configuration;
+import java.util.List;
+
public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerConfig>
{
@@ -44,6 +46,19 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC
String getDataDirectory();
+ String getFederationTag();
+
+ /**
+ * List of feature(s) to be advertised to clients on connection.
+ * Feature names are strings, beginning with qpid. followed by more or more
+ * words separated by minus signs e.g. qpid.jms-selector.
+ *
+ * If there are no features, this method must return an empty array.
+ *
+ * @return list of feature names
+ */
+ List<String> getFeatures();
+
void addVirtualHost(VirtualHostConfig virtualHost);
void createBrokerConnection(String transport,
@@ -53,5 +68,4 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC
String authMechanism,
String username, String password);
- String getFederationTag();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
index 82b2fc82d2..e1cf87277b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
@@ -22,7 +22,6 @@
package org.apache.qpid.server.configuration;
import java.util.*;
-import java.io.File;
public final class BrokerConfigType extends ConfigObjectType<BrokerConfigType, BrokerConfig>
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 9ca916a633..0d347873c2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -123,7 +123,7 @@ public class ServerConfiguration extends ConfigurationPlugin
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explictly call {@link #initialise()} as this is done via the
+ * need to explicitly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param configurationURL
@@ -169,7 +169,7 @@ public class ServerConfiguration extends ConfigurationPlugin
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explictly call {@link #initialise()} as this is done via the
+ * need to explicitly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param conf
@@ -239,6 +239,22 @@ public class ServerConfiguration extends ConfigurationPlugin
+ (_configFile == null ? "" : " Configuration file : " + _configFile);
throw new ConfigurationException(message);
}
+
+ // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration
+ // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used.
+ for (String key : new String[] {"management.ssl.keystorePath",
+ "management.ssl.keystorePassword," +
+ "connector.ssl.keystorePath",
+ "connector.ssl.keystorePassword"})
+ {
+ if (contains(key))
+ {
+ final String deprecatedXpath = key.replaceAll("\\.", "/");
+ final String preferredXpath = deprecatedXpath.replaceAll("keystore", "keyStore");
+ _logger.warn("Validation warning: " + deprecatedXpath + " is deprecated and must be replaced by " + preferredXpath
+ + (_configFile == null ? "" : " Configuration file : " + _configFile));
+ }
+ }
}
/*
@@ -404,7 +420,7 @@ public class ServerConfiguration extends ConfigurationPlugin
public final static Configuration flatConfig(File file) throws ConfigurationException
{
// We have to override the interpolate methods so that
- // interpolation takes place accross the entirety of the
+ // interpolation takes place across the entirety of the
// composite configuration. Without doing this each
// configuration object only interpolates variables defined
// inside itself.
@@ -551,7 +567,8 @@ public class ServerConfiguration extends ConfigurationPlugin
public String getManagementKeyStorePath()
{
- return getStringValue("management.ssl.keyStorePath");
+ final String fallback = getStringValue("management.ssl.keystorePath");
+ return getStringValue("management.ssl.keyStorePath", fallback);
}
public boolean getManagementSSLEnabled()
@@ -561,7 +578,8 @@ public class ServerConfiguration extends ConfigurationPlugin
public String getManagementKeyStorePassword()
{
- return getStringValue("management.ssl.keyStorePassword");
+ final String fallback = getStringValue("management.ssl.keystorePassword");
+ return getStringValue("management.ssl.keyStorePassword", fallback);
}
public boolean getQueueAutoRegister()
@@ -699,17 +717,19 @@ public class ServerConfiguration extends ConfigurationPlugin
return getListValue("connector.ssl.port", Collections.<Integer>singletonList(DEFAULT_SSL_PORT));
}
- public String getKeystorePath()
+ public String getConnectorKeyStorePath()
{
- return getStringValue("connector.ssl.keystorePath");
+ final String fallback = getStringValue("connector.ssl.keystorePath"); // pre-0.13 broker supported this name.
+ return getStringValue("connector.ssl.keyStorePath", fallback);
}
- public String getKeystorePassword()
+ public String getConnectorKeyStorePassword()
{
- return getStringValue("connector.ssl.keystorePassword");
+ final String fallback = getStringValue("connector.ssl.keystorePassword"); // pre-0.13 brokers supported this name.
+ return getStringValue("connector.ssl.keyStorePassword", fallback);
}
- public String getCertType()
+ public String getConnectorCertType()
{
return getStringValue("connector.ssl.certType", "SunX509");
}
@@ -773,4 +793,16 @@ public class ServerConfiguration extends ConfigurationPlugin
{
return getIntValue("maximumChannelCount", 256);
}
+
+ /**
+ * List of Broker features that have been disabled within configuration. Disabled
+ * features won't be advertised to the clients on connection.
+ *
+ * @return list of disabled features, or empty list if no features are disabled.
+ */
+ public List<String> getDisabledFeatures()
+ {
+ final List<String> disabledFeatures = getListValue("disabledFeatures", Collections.emptyList());
+ return disabledFeatures;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
index f21158cd0c..f330e2f708 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.federation;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -252,7 +253,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener
_qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism);
final Map<String,Object> serverProps = _qpidConnection.getServerProperties();
- _remoteFederationTag = (String) serverProps.get("qpid.federation_tag");
+ _remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG);
if(_remoteFederationTag == null)
{
_remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index f16e30ef92..0e0b18aa2f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -30,20 +30,17 @@ import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.queue.AMQQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage implements ServerMessage<AMQMessage>
+public class AMQMessage extends AbstractServerMessageImpl
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- private final AtomicInteger _referenceCount = new AtomicInteger(0);
-
/** Flag to indicate that this message requires 'immediate' delivery. */
private static final byte IMMEDIATE = 0x01;
@@ -76,6 +73,8 @@ public class AMQMessage implements ServerMessage<AMQMessage>
public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
{
+ super(handle);
+
_handle = handle;
final MessageMetaData metaData = handle.getMetaData();
_size = metaData.getContentSize();
@@ -89,12 +88,6 @@ public class AMQMessage implements ServerMessage<AMQMessage>
_channelRef = channelRef;
}
-
- public String debugIdentity()
- {
- return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
- }
-
public void setExpiration(final long expiration)
{
@@ -102,11 +95,6 @@ public class AMQMessage implements ServerMessage<AMQMessage>
}
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
-
public MessageMetaData getMessageMetaData()
{
return _handle.getMetaData();
@@ -117,88 +105,12 @@ public class AMQMessage implements ServerMessage<AMQMessage>
return getMessageMetaData().getContentHeaderBody();
}
-
-
public Long getMessageId()
{
return _handle.getMessageNumber();
}
/**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
-
- if(_referenceCount.addAndGet(count) <= 0)
- {
- _referenceCount.addAndGet(-count);
- return false;
- }
- else
- {
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- *
- * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference()
- {
- int count = _referenceCount.decrementAndGet();
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
-
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_handle != null)
- {
- _handle.remove();
-
- }
- }
- else
- {
- if (count < 0)
- {
- throw new RuntimeException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
-
- /**
* Called selectors to determin if the message has already been sent
*
* @return _deliveredToConsumer
@@ -323,10 +235,7 @@ public class AMQMessage implements ServerMessage<AMQMessage>
public String toString()
{
- // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- // _taken + " by :" + _takenBySubcription;
-
- return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
+ return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount();
}
public int getContent(ByteBuffer buf, int offset)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
new file mode 100644
index 0000000000..186bb8601c
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -0,0 +1,84 @@
+package org.apache.qpid.server.message;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.store.StoredMessage;
+
+public abstract class AbstractServerMessageImpl implements ServerMessage
+{
+ private final AtomicInteger _referenceCount = new AtomicInteger(0);
+ private final StoredMessage<?> _handle;
+
+ public AbstractServerMessageImpl(StoredMessage<?> handle)
+ {
+ _handle = handle;
+ }
+
+ public boolean incrementReference()
+ {
+ return incrementReference(1);
+ }
+
+ public boolean incrementReference(int count)
+ {
+ if(_referenceCount.addAndGet(count) <= 0)
+ {
+ _referenceCount.addAndGet(-count);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ *
+ * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference()
+ {
+ int count = _referenceCount.decrementAndGet();
+
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (count == 0)
+ {
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ if (_handle != null)
+ {
+ _handle.remove();
+ }
+ }
+ else
+ {
+ if (count < 0)
+ {
+ throw new RuntimeException("Reference count for message id " + debugIdentity()
+ + " has gone below 0.");
+ }
+ }
+ }
+
+ public String debugIdentity()
+ {
+ return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")";
+ }
+
+ protected int getReferenceCount()
+ {
+ return _referenceCount.get();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
index 2297e4200d..f9863f4945 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
@@ -44,7 +44,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
private int _bodySize;
private volatile SoftReference<ByteBuffer> _body;
- private static final int ENCODER_SIZE = 1 << 16;
+ private static final int ENCODER_SIZE = 1 << 10;
public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index af78042a63..1a230a2590 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -29,18 +29,14 @@ import java.nio.ByteBuffer;
import java.lang.ref.WeakReference;
-public class MessageTransferMessage implements InboundMessage, ServerMessage<MessageTransferMessage>
+public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage
{
-
-
private StoredMessage<MessageMetaData_0_10> _storeMessage;
-
-
private WeakReference<Session> _sessionRef;
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
{
-
+ super(storeMessage);
_storeMessage = storeMessage;
_sessionRef = sessionRef;
}
@@ -145,5 +141,4 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage<Mes
{
return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
index ed189c49c4..cb44f80b91 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
@@ -29,11 +29,11 @@ public class TransferMessageReference extends MessageReference<MessageTransferMe
protected void onReference(MessageTransferMessage message)
{
-
+ message.incrementReference();
}
protected void onRelease(MessageTransferMessage message)
{
-
+ message.decrementReference();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index b51e6aff1a..bc0d4e3bcc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -72,5 +72,5 @@ public interface AMQConnectionModel extends StatisticsGatherer
public String getUserName();
- public boolean isSessionNameUnique(String name);
+ public boolean isSessionNameUnique(byte[] name);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index bff0a79de1..b960ce8608 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -1396,8 +1396,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
@Override
- public boolean isSessionNameUnique(String name)
+ public boolean isSessionNameUnique(byte[] name)
{
+ // 0-8/0-9/0-9-1 sessions don't have names
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
index 108533ef96..6a36b22400 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
@@ -23,7 +23,10 @@ package org.apache.qpid.server.registry;
import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.common.ServerPropertyNames;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.UUID;
import java.util.List;
import java.util.Map;
@@ -158,4 +161,19 @@ public class BrokerConfigAdapter implements BrokerConfig
{
return _federationTag;
}
+
+ /**
+ * @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures()
+ */
+ @Override
+ public List<String> getFeatures()
+ {
+ final List<String> features = new ArrayList<String>();
+ if (!_instance.getConfiguration().getDisabledFeatures().contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR))
+ {
+ features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+ }
+
+ return Collections.unmodifiableList(features);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 92f21b8b1c..adb0a84151 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -460,7 +460,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public void queueDeleted(AMQQueue queue)
{
_deleted.set(true);
-// _channel.queueDeleted(queue);
}
public boolean filtersMessages()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index d83013afba..e18b453db3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -259,10 +259,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void close(AMQConstant cause, String message) throws AMQException
{
+ closeSubscriptions();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
- replyCode = ConnectionCloseCode.get(cause.getCode());
+ replyCode = ConnectionCloseCode.get(cause.getCode());
}
catch (IllegalArgumentException iae)
{
@@ -389,7 +390,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
@Override
- public boolean isSessionNameUnique(String name)
+ public boolean isSessionNameUnique(byte[] name)
{
return !super.hasSessionWithName(name);
}
@@ -399,4 +400,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
return _authorizedPrincipal.getName();
}
+
+ @Override
+ public void closed()
+ {
+ closeSubscriptions();
+ super.closed();
+ }
+
+ private void closeSubscriptions()
+ {
+ for (Session ssn : getChannels())
+ {
+ ((ServerSession)ssn).unregisterSubscriptions();
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 2de8a0425e..8d6e0e0d80 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -32,7 +32,9 @@ import java.util.StringTokenizer;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -58,15 +60,14 @@ import org.apache.qpid.transport.SessionDetached;
public class ServerConnectionDelegate extends ServerDelegate
{
- private String _localFQDN;
+ private final String _localFQDN;
private final IApplicationRegistry _appRegistry;
public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
{
- this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+ this(createConnectionProperties(appRegistry.getBroker()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
}
-
public ServerConnectionDelegate(Map<String, Object> properties,
List<Object> locales,
IApplicationRegistry appRegistry,
@@ -78,6 +79,18 @@ public class ServerConnectionDelegate extends ServerDelegate
_localFQDN = localFQDN;
}
+ private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
+ {
+ final Map<String,Object> map = new HashMap<String,Object>(2);
+ map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag());
+ final List<String> features = brokerConfig.getFeatures();
+ if (features != null && features.size() > 0)
+ {
+ map.put(ServerPropertyNames.QPID_FEATURES, features);
+ }
+ return map;
+ }
+
private static List<Object> parseToList(String mechanisms)
{
List<Object> list = new ArrayList<Object>();
@@ -234,22 +247,22 @@ public class ServerConnectionDelegate extends ServerDelegate
@Override
public void sessionAttach(final Connection conn, final SessionAttach atc)
{
- final String clientId = new String(atc.getName());
- final Session ssn = getSession(conn, atc);
+ final Session ssn;
- if(isSessionNameUnique(clientId,conn))
+ if(isSessionNameUnique(atc.getName(), conn))
{
+ ssn = sessionAttachImpl(conn, atc);
conn.registerSession(ssn);
- super.sessionAttach(conn, atc);
}
else
{
+ ssn = getSession(conn, atc);
ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
ssn.closed();
}
}
- private boolean isSessionNameUnique(final String name, final Connection conn)
+ private boolean isSessionNameUnique(final byte[] name, final Connection conn)
{
final ServerConnection sconn = (ServerConnection) conn;
final String userId = sconn.getUserName();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 28dec2ad28..429cc4cf66 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -53,6 +53,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -172,6 +173,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void postCommit()
{
+ MessageReference<?> ref = message.newReference();
for(int i = 0; i < _queues.length; i++)
{
try
@@ -184,6 +186,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
throw new RuntimeException(e);
}
}
+ ref.release();
}
public void onRollback()
@@ -412,12 +415,11 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
queue.unregisterSubscription(sub);
}
-
}
catch (AMQException e)
{
// TODO
- _logger.error("Failed to unregister subscription", e);
+ _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
}
finally
{
@@ -683,12 +685,17 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
// unregister subscriptions in order to prevent sending of new messages
// to subscriptions with closing session
+ unregisterSubscriptions();
+
+ super.close();
+ }
+
+ void unregisterSubscriptions()
+ {
final Collection<Subscription_0_10> subscriptions = getSubscriptions();
for (Subscription_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
-
- super.close();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 17bd06538f..c87919b478 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -1261,11 +1261,10 @@ public class ServerSessionDelegate extends SessionDelegate
{
setThreadSubject(session);
- for(Subscription_0_10 sub : getSubscriptions(session))
- {
- ((ServerSession)session).unregister(sub);
- }
- ((ServerSession)session).onClose();
+ ServerSession serverSession = (ServerSession)session;
+
+ serverSession.unregisterSubscriptions();
+ serverSession.onClose();
}
@Override
@@ -1274,11 +1273,6 @@ public class ServerSessionDelegate extends SessionDelegate
closed(session);
}
- public Collection<Subscription_0_10> getSubscriptions(Session session)
- {
- return ((ServerSession)session).getSubscriptions();
- }
-
private void setThreadSubject(Session session)
{
final ServerConnection scon = (ServerConnection) session.getConnection();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index d368a2d1ee..9afd2a45a9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -256,7 +256,7 @@ public class ServerConfigurationTest extends QpidTestCase
assertEquals(false, _serverConfig.getManagementSSLEnabled());
}
- public void testGetManagementKeyStorePassword() throws ConfigurationException
+ public void testGetManagementKeystorePassword() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
@@ -534,43 +534,57 @@ public class ServerConfigurationTest extends QpidTestCase
assertEquals("10", _serverConfig.getSSLPorts().get(0));
}
- public void testGetKeystorePath() throws ConfigurationException
+ public void testGetConnectorKeystorePath() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertNull(_serverConfig.getKeystorePath());
+ assertNull(_serverConfig.getConnectorKeyStorePath());
// Check value we set
- _config.setProperty("connector.ssl.keystorePath", "a");
+ _config.setProperty("connector.ssl.keyStorePath", "a");
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals("a", _serverConfig.getKeystorePath());
+ assertEquals("a", _serverConfig.getConnectorKeyStorePath());
+
+ // Ensure we continue to support the old name keystorePath
+ _config.clearProperty("connector.ssl.keyStorePath");
+ _config.setProperty("connector.ssl.keystorePath", "b");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals("b", _serverConfig.getConnectorKeyStorePath());
}
- public void testGetKeystorePassword() throws ConfigurationException
+ public void testGetConnectorKeystorePassword() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertNull(_serverConfig.getKeystorePassword());
+ assertNull(_serverConfig.getConnectorKeyStorePassword());
// Check value we set
- _config.setProperty("connector.ssl.keystorePassword", "a");
+ _config.setProperty("connector.ssl.keyStorePassword", "a");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals("a", _serverConfig.getConnectorKeyStorePassword());
+
+ // Ensure we continue to support the old name keystorePassword
+ _config.clearProperty("connector.ssl.keyStorePassword");
+ _config.setProperty("connector.ssl.keystorePassword", "b");
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals("a", _serverConfig.getKeystorePassword());
+ assertEquals("b", _serverConfig.getConnectorKeyStorePassword());
}
- public void testGetCertType() throws ConfigurationException
+ public void testGetConnectorCertType() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertEquals("SunX509", _serverConfig.getCertType());
+ assertEquals("SunX509", _serverConfig.getConnectorCertType());
// Check value we set
_config.setProperty("connector.ssl.certType", "a");
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals("a", _serverConfig.getCertType());
+ assertEquals("a", _serverConfig.getConnectorCertType());
}
public void testGetUseBiasedWrites() throws ConfigurationException
@@ -1284,7 +1298,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
/**
- * Test that a non-existant virtualhost file throws a {@link ConfigurationException}.
+ * Test that a non-existent virtualhost file throws a {@link ConfigurationException}.
* <p>
* Test for QPID-2624
*/
@@ -1312,7 +1326,27 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
+ * Tests that element disabledFeatures allows features that would
+ * otherwise be advertised by the broker to be turned off.
+ */
+ public void testDisabledFeatures() throws ConfigurationException
+ {
+ // Check default
+ _serverConfig.initialise();
+ _serverConfig = new ServerConfiguration(_config);
+ assertEquals("Unexpected size", 0, _serverConfig.getDisabledFeatures().size());
+
+ // Check value we set
+ _config.addProperty("disabledFeatures", "qpid.feature1");
+ _config.addProperty("disabledFeatures", "qpid.feature2");
+ _serverConfig = new ServerConfiguration(_config);
+
+ assertEquals("Unexpected size",2, _serverConfig.getDisabledFeatures().size());
+ assertTrue("Unexpected contents", _serverConfig.getDisabledFeatures().contains("qpid.feature1"));
+ }
+
+ /**
* Tests that the old element security.jmx.access (that used to be used
* to define JMX access rights) is rejected.
*/
@@ -1338,7 +1372,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
* Tests that the old element security.jmx.principal-database (that used to define the
* principal database used for JMX authentication) is rejected.
*/
@@ -1364,7 +1398,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
* Tests that the old element security.principal-databases. ... (that used to define
* principal databases) is rejected.
*/
@@ -1389,7 +1423,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
* Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was
* replaced by housekeeping.checkPeriod) is rejected.
*/
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java
index 4a03445357..f2249c5931 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java
@@ -72,11 +72,11 @@ public class OsgiSystemPackageUtilTest extends QpidTestCase
_map.put("org.apache.qpid.xyz", "1.0.0");
_map.put("org.abc", "1.2.3");
- _util = new OsgiSystemPackageUtil(new Version("0.13"), _map);
+ _util = new OsgiSystemPackageUtil(new Version("0.15"), _map);
final String systemPackageString = _util.getFormattedSystemPackageString();
- assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=0.13.0", systemPackageString);
+ assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=0.15.0", systemPackageString);
}
public void testWithQpidPackageWithoutQpidReleaseNumberSet() throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
index 2d41eb9899..2ffa157ca8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
@@ -86,11 +86,7 @@ public class ReferenceCountingTest extends QpidTestCase
AMQMessage message = new AMQMessage(storedMessage);
- message = message.takeReference();
-
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
+ message.incrementReference();
assertEquals(1, _store.getMessageCount());
message.decrementReference();
@@ -146,12 +142,12 @@ public class ReferenceCountingTest extends QpidTestCase
AMQMessage message = new AMQMessage(storedMessage);
- message = message.takeReference();
+ message.incrementReference();
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertEquals(1, _store.getMessageCount());
- message = message.takeReference();
+ message.incrementReference();
message.decrementReference();
assertEquals(1, _store.getMessageCount());
}
diff --git a/qpid/java/build.deps b/qpid/java/build.deps
index 15844d7a42..17f963ece7 100644
--- a/qpid/java/build.deps
+++ b/qpid/java/build.deps
@@ -139,6 +139,6 @@ management-eclipse-plugin.test.libs=${test.libs}
management-common.test.libs=${test.libs}
# optional bdbstore module deps
-bdb-je=lib/bdbstore/je-4.0.103.jar
+bdb-je=lib/bdbstore/je-4.0.117.jar
bdbstore.libs=${bdb-je}
bdbstore.test.libs=${test.libs}
diff --git a/qpid/java/build.xml b/qpid/java/build.xml
index 4400626370..e6c154d3d0 100644
--- a/qpid/java/build.xml
+++ b/qpid/java/build.xml
@@ -22,8 +22,6 @@
<import file="common.xml"/>
- <property file="${project.root}/build.overrides"/>
-
<findSubProjects name="broker-plugins" dir="broker-plugins"/>
<findSubProjects name="client-plugins" dir="client-plugins"/>
<findSubProjects name="management" dir="management" excludes="common,example"/>
diff --git a/qpid/java/client/src/main/java/client.bnd b/qpid/java/client/src/main/java/client.bnd
index 98696dc7d8..bd13844e8e 100755
--- a/qpid/java/client/src/main/java/client.bnd
+++ b/qpid/java/client/src/main/java/client.bnd
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.13.0
+ver: 0.15.0
Bundle-SymbolicName: qpid-client
Bundle-Version: ${ver}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index f15af72407..ad7885f195 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -175,6 +175,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
+ //used to track the last failover time for
+ //Address resolution purposes
+ private volatile long _lastFailoverTime = 0;
+
/**
* @param broker brokerdetails
* @param username username
@@ -1076,6 +1080,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public boolean firePreFailover(boolean redirect)
{
+ _lastFailoverTime = System.currentTimeMillis();
boolean proceed = true;
if (_connectionListener != null)
{
@@ -1397,6 +1402,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return null;
}
}
+
+ /**
+ * Tests whether the Broker has advertised support for the named feature.
+ *
+ * @param featureName
+ *
+ * @return true if the feature is supported, or false otherwise.
+ */
+ boolean isSupportedServerFeature(final String featureName)
+ {
+ return _delegate.isSupportedServerFeature(featureName);
+ }
+
public boolean isFailingOver()
{
return (_protocolHandler.getFailoverLatch() != null);
@@ -1462,4 +1480,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
}
+
+ public long getLastFailoverTime()
+ {
+ return _lastFailoverTime;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 8768f93c8c..afb0e45f7a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -65,4 +65,16 @@ public interface AMQConnectionDelegate
ProtocolVersion getProtocolVersion();
boolean verifyClientID() throws JMSException, AMQException;
+
+ /**
+ * Tests whether the server has advertised support for the specified feature
+ * via the qpid.features server connection property. By convention the feature name
+ * with begin <code>qpid.</code> followed by one or more words separated by minus signs
+ * e.g. qpid.jms-selector.
+ *
+ * @param featureName name of feature.
+ *
+ * @return true if the feature is supported by the server
+ */
+ boolean isSupportedServerFeature(final String featureName);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 0ed3db6ecb..5e4f84ce9f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +19,7 @@ package org.apache.qpid.client;
*
*/
+package org.apache.qpid.client;
import java.io.IOException;
import java.util.ArrayList;
@@ -36,6 +36,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.transport.ClientConnectionDelegate;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
@@ -63,16 +64,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
/**
- * The name of the UUID property
- */
- private static final String UUID_NAME = "qpid.federation_tag";
- /**
* The AMQ Connection.
*/
- private AMQConnection _conn;
+ private final AMQConnection _conn;
/**
- * The QpidConeection instance that is mapped with thie JMS connection.
+ * The QpidConeection instance that is mapped with this JMS connection.
*/
org.apache.qpid.transport.Connection _qpidConnection;
private ConnectionException exception = null;
@@ -281,24 +278,29 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
- try
+ _qpidConnection.notifyFailoverRequired();
+
+ synchronized (_conn.getFailoverMutex())
{
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ try
{
- _conn.failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- return;
+ if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ {
+ _conn.failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
}
}
@@ -324,6 +326,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
+ if (_conn.isFailingOver())
+ {
+ try
+ {
+ _conn.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+
try
{
return operation.execute();
@@ -352,7 +366,32 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public String getUUID()
{
- return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
+ return (String)_qpidConnection.getServerProperties().get(ServerPropertyNames.FEDERATION_TAG);
+ }
+
+ /*
+ * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+ */
+ public boolean isSupportedServerFeature(final String featureName)
+ {
+ if (featureName == null)
+ {
+ throw new IllegalArgumentException("featureName cannot be null");
+ }
+ final Map<String, Object> serverProperties = _qpidConnection.getServerProperties();
+ boolean featureSupported = false;
+ if (serverProperties != null && serverProperties.containsKey(ServerPropertyNames.QPID_FEATURES))
+ {
+ final Object supportServerFeatures = serverProperties.get(ServerPropertyNames.QPID_FEATURES);
+ featureSupported = supportServerFeatures instanceof List && ((List<String>)supportServerFeatures).contains(featureName);
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Server support for feature '" + featureName + "' : " + featureSupported);
+ }
+
+ return featureSupported;
}
private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index b1a22155d6..bff4df0e93 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.security.GeneralSecurityException;
-import java.security.Security;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -44,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
- private AMQConnection _conn;
+ private final AMQConnection _conn;
public void closeConnection(long timeout) throws JMSException, AMQException
@@ -379,4 +379,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return true;
}
+
+ /*
+ * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+ */
+ public boolean isSupportedServerFeature(String featureName)
+ {
+ // The Qpid Java Broker 0-8..0-9-1 does not advertise features by the qpid.features property, so for now
+ // we just hardcode JMS selectors as supported.
+ return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index acd46da11a..f9a38138ba 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.naming.NamingException;
@@ -59,7 +60,7 @@ public abstract class AMQDestination implements Destination, Referenceable
private boolean _browseOnly;
- private boolean _isAddressResolved;
+ private AtomicLong _addressResolved = new AtomicLong(0);
private AMQShortString _queueName;
@@ -77,7 +78,7 @@ public abstract class AMQDestination implements Destination, Referenceable
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
-
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -740,12 +741,12 @@ public abstract class AMQDestination implements Destination, Referenceable
public boolean isAddressResolved()
{
- return _isAddressResolved;
+ return _addressResolved.get() > 0;
}
- public void setAddressResolved(boolean addressResolved)
+ public void setAddressResolved(long addressResolved)
{
- _isAddressResolved = addressResolved;
+ _addressResolved.set(addressResolved);
}
private static Address createAddressFromString(String str)
@@ -823,7 +824,7 @@ public abstract class AMQDestination implements Destination, Referenceable
dest.setTargetNode(_targetNode);
dest.setSourceNode(_sourceNode);
dest.setLink(_link);
- dest.setAddressResolved(_isAddressResolved);
+ dest.setAddressResolved(_addressResolved.get());
return dest;
}
@@ -836,4 +837,9 @@ public abstract class AMQDestination implements Destination, Referenceable
{
_isDurable = b;
}
+
+ public boolean isResolvedAfter(long time)
+ {
+ return _addressResolved.get() > time;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 44c4e8987a..ef44221ec1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -89,9 +89,9 @@ import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
@@ -308,7 +308,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected final FlowControllingBlockingQueue _queue;
/** Holds the highest received delivery tag. */
- private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
@@ -534,7 +534,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
_queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
-
+
// Add creation logging to tie in with the existing close logging
if (_logger.isInfoEnabled())
{
@@ -856,6 +856,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
//Check that we are clean to commit.
if (_failedOverDirty)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
+ }
rollback();
throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
@@ -890,7 +894,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
- if (!consumer.isNoConsume()) // Normal Consumer
+ if (!consumer.isBrowseOnly()) // Normal Consumer
{
// Clean the Maps up first
// Flush any pending messages for this consumerTag
@@ -1092,7 +1096,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
-
+
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
@@ -1814,9 +1818,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
suspendChannel(true);
}
- // Let the dispatcher know that all the incomming messages
- // should be rolled back(reject/release)
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
syncDispatchQueue();
@@ -2008,28 +2010,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
AMQDestination amqd = (AMQDestination) destination;
- // TODO: Define selectors in AMQP
- // TODO: construct the rawSelector from the selector string if rawSelector == null
- final FieldTable ft = FieldTableFactory.newFieldTable();
- // if (rawSelector != null)
- // ft.put("headers", rawSelector.getDataAsBytes());
- // rawSelector is used by HeadersExchange and is not a JMS Selector
- if (rawSelector != null)
- {
- ft.addAll(rawSelector);
- }
-
- // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
- // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
- // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
- // argument, as specifying null for the arguments when querying means they should not be checked at all
- ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
-
C consumer;
try
{
consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
- noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose);
}
catch(TransportException e)
{
@@ -2570,7 +2555,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @param queueName
*/
private void consumeFromQueue(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
+ AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException
{
int tagId = _nextTag++;
@@ -2598,7 +2583,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendConsume(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
+ AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException;
private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
throws JMSException
@@ -2923,7 +2908,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter());
}
catch (FailoverException e)
{
@@ -3202,13 +3187,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
setConnectionStopped(true);
}
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
for (C consumer : _consumers.values())
{
- if (!consumer.isNoConsume())
+ if (!consumer.isBrowseOnly())
{
consumer.rollback();
}
@@ -3351,6 +3336,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message because delivery tag " + deliveryTag
+ + " <= rollback mark " + _rollbackMark.get());
+ }
rejectMessage(message, true);
}
else if (_usingDispatcherForCleanup)
@@ -3390,7 +3380,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- if (consumer.isNoConsume())
+ if (consumer.isBrowseOnly())
{
_dispatcherLogger.info("Received a message("
+ System.identityHashCode(message) + ")" + "["
@@ -3412,6 +3402,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Don't reject if we're already closing
if (!_closed.get())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
+ + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+ }
rejectMessage(message, true);
}
}
@@ -3542,4 +3537,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
}
+
+ private void setRollbackMark()
+ {
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 86e1fc08de..7e5edef38d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
@@ -74,6 +75,7 @@ import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
+import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -294,23 +296,34 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- void messageAcknowledge(RangeSet ranges, boolean accept)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept)
{
messageAcknowledge(ranges,accept,false);
}
- void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
{
- Session ssn = getQpidSession();
- for (Range range : ranges)
+ final Session ssn = getQpidSession();
+ flushProcessed(ranges,accept);
+ if (accept)
{
- ssn.processed(range);
+ ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
}
- ssn.flushProcessed(accept ? BATCH : NONE);
- if (accept)
+ }
+
+ /**
+ * Flush any outstanding commands. This causes session complete to be sent.
+ * @param ranges the range of command ids.
+ * @param batch true if batched.
+ */
+ void flushProcessed(final RangeSet ranges, final boolean batch)
+ {
+ final Session ssn = getQpidSession();
+ for (final Range range : ranges)
{
- ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+ ssn.processed(range);
}
+ ssn.flushProcessed(batch ? BATCH : NONE);
}
/**
@@ -364,7 +377,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_logger.debug("Binding queue : " + queue +
" exchange: " + exchange +
" using binding key " + binding.getBindingKey() +
- " with args " + printMap(binding.getArgs()));
+ " with args " + Strings.printMap(binding.getArgs()));
getQpidSession().exchangeBind(queue,
exchange,
binding.getBindingKey(),
@@ -496,13 +509,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
- final FieldTable ft, final boolean noConsume,
+ final FieldTable rawSelector, final boolean noConsume,
final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
+ _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh,
prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
}
@@ -568,56 +581,30 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Registers the consumer with the broker
*/
public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector, int tag)
+ boolean nowait, MessageFilter messageSelector, int tag)
throws AMQException, FailoverException
{
- boolean preAcquire;
-
- long capacity = getCapacity(consumer.getDestination());
-
- try
- {
- boolean isTopic;
- Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
-
- if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
- {
- isTopic = consumer.getDestination() instanceof AMQTopic ||
- consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ;
-
- preAcquire = isTopic || (!consumer.isNoConsume() &&
- (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")));
- }
- else
- {
- isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE;
-
- preAcquire = !consumer.isNoConsume() &&
- (isTopic || consumer.getMessageSelector() == null ||
- consumer.getMessageSelector().equals(""));
-
- arguments.putAll(
- (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
- }
-
- boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
-
- if (consumer.getDestination().getLink() != null)
- {
- acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
- }
-
- getQpidSession().messageSubscribe
- (queueName.toString(), String.valueOf(tag),
- acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
- preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
- }
- catch (JMSException e)
+ boolean preAcquire = consumer.isPreAcquire();
+
+ AMQDestination destination = consumer.getDestination();
+ long capacity = consumer.getCapacity();
+
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
+ Link link = destination.getLink();
+ if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
{
- throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
+ arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs());
}
+ boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+
+ getQpidSession().messageSubscribe
+ (queueName.toString(), String.valueOf(tag),
+ acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+ preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
if (capacity == 0)
@@ -646,21 +633,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private long getCapacity(AMQDestination destination)
- {
- long capacity = 0;
- if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getLink().getConsumerCapacity() > 0)
- {
- capacity = destination.getLink().getConsumerCapacity();
- }
- else if (prefetch())
- {
- capacity = getAMQConnection().getMaxPrefetch();
- }
- return capacity;
- }
-
/**
* Create an 0_10 message producer
*/
@@ -825,7 +797,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
//only set if msg list is null
try
{
- long capacity = getCapacity(consumer.getDestination());
+ long capacity = consumer.getCapacity();
if (capacity == 0)
{
@@ -969,17 +941,23 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Store non committed messages for this session
- * With 0.10 messages are consumed with window mode, we must send a completion
- * before the window size is reached so credits don't dry up.
* @param id
*/
@Override protected void addDeliveredMessage(long id)
{
_txRangeSet.add((int) id);
_txSize++;
+ }
+
+ /**
+ * With 0.10 messages are consumed with window mode, we must send a completion
+ * before the window size is reached so credits don't dry up.
+ */
+ protected void sendTxCompletionsIfNecessary()
+ {
// this is a heuristic, we may want to have that configurable
- if (_connection.getMaxPrefetch() == 1 ||
- _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
+ if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 ||
+ _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0))
{
// send completed so consumer credits don't dry up
messageAcknowledge(_txRangeSet, false);
@@ -1168,8 +1146,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean isConsumer,
boolean noWait) throws AMQException
{
- if (dest.isAddressResolved())
- {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+ {
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
createSubscriptionQueue(dest);
@@ -1189,22 +1167,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
int type = resolveAddressType(dest);
- if (type == AMQDestination.QUEUE_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.UNRELIABLE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
- {
- throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");
- }
-
switch (type)
{
case AMQDestination.QUEUE_TYPE:
@@ -1258,7 +1220,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(true);
+ dest.setAddressResolved(System.currentTimeMillis());
}
}
@@ -1352,22 +1314,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
dest.setRoutingKey(new AMQShortString(dest.getSubject()));
}
- /** This should be moved to a suitable utility class */
- private String printMap(Map<String,Object> map)
- {
- StringBuilder sb = new StringBuilder();
- sb.append("<");
- if (map != null)
- {
- for(String key : map.keySet())
- {
- sb.append(key).append(" = ").append(map.get(key)).append(" ");
- }
- }
- sb.append(">");
- return sb.toString();
- }
-
protected void acknowledgeImpl()
{
RangeSet range = gatherUnackedRangeSet();
@@ -1378,4 +1324,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().sync();
}
}
+
+ @Override
+ void resubscribe() throws AMQException
+ {
+ // Also reset the delivery tag tracker, to insure we dont
+ // return the first <total number of msgs received on session>
+ // messages sent by the brokers following the first rollback
+ // after failover
+ _highestDeliveryTag.set(-1);
+ super.resubscribe();
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 369c8a6e9d..e33410f5fe 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -41,6 +41,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
@@ -333,24 +334,9 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
AMQShortString queueName,
AMQProtocolHandler protocolHandler,
boolean nowait,
- String messageSelector,
+ MessageFilter messageSelector,
int tag) throws AMQException, FailoverException
{
- FieldTable arguments = FieldTableFactory.newFieldTable();
- if ((messageSelector != null) && !messageSelector.equals(""))
- {
- arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
- }
-
- if (consumer.isAutoClose())
- {
- arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
- }
-
- if (consumer.isNoConsume())
- {
- arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
- }
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
@@ -359,7 +345,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
nowait,
- arguments);
+ consumer.getArguments());
AMQFrame jmsConsume = body.generateFrame(_channelId);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 3b807591b0..7bb400fada 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
@@ -31,6 +35,7 @@ import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -52,7 +57,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
/** The connection being used by this consumer */
protected final AMQConnection _connection;
- protected final String _messageSelector;
+ protected final MessageFilter _messageSelectorFilter;
private final boolean _noLocal;
@@ -138,7 +143,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
private final boolean _autoClose;
- private final boolean _noConsume;
+ private final boolean _browseOnly;
private List<StackTraceElement> _closedStack = null;
@@ -146,28 +151,44 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
_channelId = channelId;
_connection = connection;
- _messageSelector = messageSelector;
_noLocal = noLocal;
_destination = destination;
_messageFactory = messageFactory;
_session = session;
_protocolHandler = protocolHandler;
- _arguments = arguments;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
_synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
- _noConsume = noConsume;
+ _browseOnly = browseOnly;
+
+ try
+ {
+ if (messageSelector == null || "".equals(messageSelector.trim()))
+ {
+ _messageSelectorFilter = null;
+ }
+ else
+ {
+ _messageSelectorFilter = new JMSSelectorFilter(messageSelector);
+ }
+ }
+ catch (final AMQInternalException ie)
+ {
+ InvalidSelectorException ise = new InvalidSelectorException("cannot create consumer because of selector issue");
+ ise.setLinkedException(ie);
+ throw ise;
+ }
// Force queue browsers not to use acknowledge modes.
- if (_noConsume)
+ if (_browseOnly)
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
@@ -175,6 +196,21 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
_acknowledgeMode = acknowledgeMode;
}
+
+ final FieldTable ft = FieldTableFactory.newFieldTable();
+ // rawSelector is used by HeadersExchange and is not a JMS Selector
+ if (rawSelector != null)
+ {
+ ft.addAll(rawSelector);
+ }
+
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+ // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
+ // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
+ // argument, as specifying null for the arguments when querying means they should not be checked at all
+ ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
+
+ _arguments = ft;
}
public AMQDestination getDestination()
@@ -186,7 +222,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
checkPreConditions();
- return _messageSelector;
+ return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector();
}
public MessageListener getMessageListener() throws JMSException
@@ -345,6 +381,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return _receiving.get();
}
+ public MessageFilter getMessageSelectorFilter()
+ {
+ return _messageSelectorFilter;
+ }
+
public Message receive() throws JMSException
{
return receive(0);
@@ -874,9 +915,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return _autoClose;
}
- public boolean isNoConsume()
+ public boolean isBrowseOnly()
{
- return _noConsume;
+ return _browseOnly;
}
public void rollback()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 3c24c67f9b..47c20b683c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -20,22 +20,20 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.*;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.jms.Session;
-import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,11 +49,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
protected final Logger _logger = LoggerFactory.getLogger(getClass());
/**
- * The message selector filter associated with this consumer message selector
- */
- private MessageFilter _filter = null;
-
- /**
* The underlying QpidSession
*/
private AMQSession_0_10 _0_10session;
@@ -63,7 +56,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
/**
* Indicates whether this consumer receives pre-acquired messages
*/
- private boolean _preAcquire = true;
+ private final boolean _preAcquire;
/**
* Specify whether this consumer is performing a sync receive
@@ -71,44 +64,27 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
- private long capacity = 0;
+ private final long _capacity;
+
+ /** Flag indicating if the server supports message selectors */
+ protected final boolean _serverJmsSelectorSupport;
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
- AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+ rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
- if (messageSelector != null && !messageSelector.equals(""))
- {
- try
- {
- _filter = new JMSSelectorFilter(messageSelector);
- }
- catch (AMQInternalException e)
- {
- throw new InvalidSelectorException("cannot create consumer because of selector issue");
- }
- if (destination instanceof AMQQueue)
- {
- _preAcquire = false;
- }
- }
-
- // Destination setting overrides connection defaults
- if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getLink().getConsumerCapacity() > 0)
- {
- capacity = destination.getLink().getConsumerCapacity();
- }
- else if (getSession().prefetch())
- {
- capacity = _0_10session.getAMQConnection().getMaxPrefetch();
- }
+
+ _preAcquire = evaluatePreAcquire(browseOnly, destination);
+
+ _capacity = evaluateCapacity(destination);
+ _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
@@ -122,7 +98,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
-
@Override public void setConsumerTag(int consumerTag)
{
super.setConsumerTag(consumerTag);
@@ -148,15 +123,22 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (checkPreConditions(jmsMessage))
{
- if (isMessageListenerSet() && capacity == 0)
+ if (isMessageListenerSet() && _capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
+ else
+ {
+ // if we are synchronously waiting for a message
+ // and messages are not pre-fetched we then need to request another one
+ if(_capacity == 0)
+ {
+ messageFlow();
+ }
+ }
}
catch (AMQException e)
{
@@ -227,12 +209,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
{
boolean messageOk = true;
- // TODO Use a tag for fiding out if message filtering is done here or by the broker.
try
{
- if (_messageSelector != null && !_messageSelector.equals(""))
+ if (_messageSelectorFilter != null && !_serverJmsSelectorSupport)
{
- messageOk = _filter.matches(message);
+ messageOk = _messageSelectorFilter.matches(message);
}
}
catch (Exception e)
@@ -245,6 +226,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
+
if (!messageOk)
{
if (_preAcquire)
@@ -261,23 +243,15 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Message not OK, releasing");
+ _logger.debug("filterMessage - not ack'ing message as not acquired");
}
- releaseMessage(message);
- }
- // if we are syncrhonously waiting for a message
- // and messages are not prefetched we then need to request another one
- if(capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ flushUnwantedMessage(message);
}
}
- // now we need to acquire this message if needed
- // this is the case of queue with a message selector set
- if (!_preAcquire && messageOk && !isNoConsume())
+ else if (!_preAcquire && !isBrowseOnly())
{
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
if (_logger.isDebugEnabled())
{
_logger.debug("filterMessage - trying to acquire message");
@@ -285,6 +259,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
messageOk = acquireMessage(message);
_logger.debug("filterMessage - message acquire status : " + messageOk);
}
+
return messageOk;
}
@@ -295,38 +270,38 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
* @param message The message to be acknowledged
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
+ private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
{
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.messageAcknowledge
- (ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- AMQException amqe = _0_10session.getCurrentException();
- if (amqe != null)
- {
- throw amqe;
- }
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
}
}
/**
- * Release a message
+ * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
+ * processed to ensure their AMQP command-id is marked completed.
*
- * @param message The message to be released
- * @throws AMQException If the message cannot be released due to some internal error.
+ * @param message The unwanted message to be flushed
+ * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
*/
- private void releaseMessage(AbstractJMSMessage message) throws AMQException
+ private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
{
- if (_preAcquire)
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.flushProcessed(ranges,false);
+
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
{
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageRelease(ranges);
- _0_10session.sync();
+ throw amqe;
}
}
@@ -337,36 +312,37 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
* @return true if the message has been acquired, false otherwise.
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
+ private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
{
boolean result = false;
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
- Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+ final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
- RangeSet acquired = acq.getTransfers();
- if (acquired != null && acquired.size() > 0)
- {
- result = true;
- }
+ final RangeSet acquired = acq.getTransfers();
+ if (acquired != null && acquired.size() > 0)
+ {
+ result = true;
}
return result;
}
+ private void messageFlow()
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
try
{
- if (messageListener != null && capacity == 0)
+ if (messageListener != null && _capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -389,9 +365,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow
- (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
}
@@ -406,15 +380,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (capacity == 0)
+ if (_capacity == 0)
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
+ if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
@@ -427,18 +399,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
(getConsumerTagString(), MessageCreditUnit.BYTE,
0xFFFFFFFF, Option.UNRELIABLE);
- if (capacity > 0)
+ if (_capacity > 0)
{
_0_10session.getQpidSession().messageFlow
(getConsumerTagString(),
MessageCreditUnit.MESSAGE,
- capacity,
+ _capacity,
Option.UNRELIABLE);
}
_0_10session.syncDispatchQueue();
o = super.getMessageFromQueue(-1);
}
- if (capacity == 0)
+ if (_capacity == 0)
{
_syncReceive.set(false);
}
@@ -448,16 +420,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
void postDeliver(AbstractJMSMessage msg)
{
super.postDeliver(msg);
- if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+
+ switch (_acknowledgeMode)
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ case Session.SESSION_TRANSACTED:
+ _0_10session.sendTxCompletionsIfNecessary();
+ break;
+ case Session.NO_ACKNOWLEDGE:
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ break;
+ case Session.AUTO_ACKNOWLEDGE:
+ if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().sync();
+ }
+ break;
}
- if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
- !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
- {
- ((AMQSession_0_10) getSession()).getQpidSession().sync();
- }
}
Message receiveBrowse() throws JMSException
@@ -526,4 +508,51 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
}
+
+ long getCapacity()
+ {
+ return _capacity;
+ }
+
+ boolean isPreAcquire()
+ {
+ return _preAcquire;
+ }
+
+ private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination)
+ {
+ boolean preAcquire;
+ if (browseOnly)
+ {
+ preAcquire = false;
+ }
+ else
+ {
+ boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
+ if (isQueue && getMessageSelectorFilter() != null)
+ {
+ preAcquire = false;
+ }
+ else
+ {
+ preAcquire = true;
+ }
+ }
+ return preAcquire;
+ }
+
+ private long evaluateCapacity(AMQDestination destination)
+ {
+ long capacity = 0;
+ if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
+ {
+ capacity = destination.getLink().getConsumerCapacity();
+ }
+ else if (getSession().prefetch())
+ {
+ capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+ }
+ return capacity;
+ }
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 00acd5e866..cf1d7cedeb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.client;
-import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,24 +38,23 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
+ AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
- protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
- acknowledgeMode, noConsume, autoClose);
- try
+ protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive,
+ acknowledgeMode, browseOnly, autoClose);
+ final FieldTable consumerArguments = getArguments();
+ if (isAutoClose())
{
-
- if (messageSelector != null && messageSelector.length() > 0)
- {
- JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector);
- }
+ consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
}
- catch (AMQInternalException e)
+
+ if (isBrowseOnly())
{
- throw new InvalidSelectorException("cannot create consumer because of selector issue");
+ consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+
}
void sendCancel() throws AMQException, FailoverException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 57f64c2f92..16afa51c74 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
catch (Exception e)
{
- JMSException jmse = new JMSException("Exception when sending message");
+ JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage());
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 5f97d625b4..c73d800b14 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -20,18 +20,14 @@
*/
package org.apache.qpid.client.messaging.address;
-import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED;
-
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-
public class Link
{
public enum FilterType { SQL92, XQUERY, SUBJECT }
- public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED }
+ public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE }
protected String name;
protected String _filter;
@@ -42,7 +38,7 @@ public class Link
protected int _producerCapacity = 0;
protected Node node;
protected Subscription subscription;
- protected Reliability reliability = UNSPECIFIED;
+ protected Reliability reliability = Reliability.AT_LEAST_ONCE;
public Reliability getReliability()
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index a938bd47f8..b7253e6e9c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -47,6 +47,7 @@ import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -362,7 +363,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void closeProtocolSession() throws AMQException
{
- _protocolHandler.closeConnection(0);
+ try
+ {
+ _protocolHandler.getNetworkConnection().close();
+ }
+ catch(TransportException e)
+ {
+ //ignore such exceptions, they were already logged
+ //and this is a forcible close.
+ }
}
public void failover(String host, int port)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
index 4159986090..a1b4aff659 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
@@ -26,20 +26,21 @@ import org.slf4j.LoggerFactory;
public class JMSSelectorFilter implements MessageFilter
{
- /**
- * this JMSSelectorFilter's logger
- */
private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class);
- private String _selector;
- private BooleanExpression _matcher;
+ private final String _selector;
+ private final BooleanExpression _matcher;
public JMSSelectorFilter(String selector) throws AMQInternalException
{
+ if (selector == null || "".equals(selector))
+ {
+ throw new IllegalArgumentException("Cannot create a JMSSelectorFilter with a null or empty selector string");
+ }
_selector = selector;
- if (JMSSelectorFilter._logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+ _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
}
_matcher = new SelectorParser().parse(selector);
}
@@ -49,16 +50,15 @@ public class JMSSelectorFilter implements MessageFilter
try
{
boolean match = _matcher.matches(message);
- if (JMSSelectorFilter._logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System
- .identityHashCode(_selector) + "):" + _selector);
+ _logger.debug(message + " match(" + match + ") selector(" + _selector + "): " + _selector);
}
return match;
}
catch (AMQInternalException e)
{
- JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message " + message, e);
+ _logger.warn("Caught exception when evaluating message selector for message " + message, e);
}
return false;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
index 62e4a28c1e..ec0e8ea4c0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
@@ -17,11 +17,11 @@
*/
package org.apache.qpid.filter;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.message.AbstractJMSMessage;
public interface MessageFilter
{
- boolean matches(AbstractJMSMessage message) throws AMQInternalException;
+ boolean matches(AbstractJMSMessage message);
+ String getSelector();
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 849827216c..68531eee84 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -29,7 +29,6 @@ import javax.jms.MessageProducer;
import junit.framework.TestCase;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.Connection.SessionFactory;
@@ -334,7 +333,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1);
}
catch (Exception e)
@@ -383,7 +382,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receive(1);
fail("JMSException should be thrown");
@@ -401,7 +400,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receive(1);
}
@@ -419,7 +418,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receiveNoWait();
fail("JMSException should be thrown");
@@ -437,7 +436,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.setMessageListener(new MockMessageListener());
fail("JMSException should be thrown");
}
@@ -454,7 +453,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.setMessageListener(new MockMessageListener());
}
catch (Exception e)
@@ -471,7 +470,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.close();
}
catch (Exception e)
@@ -488,7 +487,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.close();
fail("JMSException should be thrown");
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java
new file mode 100644
index 0000000000..d4d8ea4350
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.filter;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.TestMessageHelper;
+
+public class JMSSelectorFilterTest extends TestCase
+{
+
+ public void testEmptySelectorFilter() throws Exception
+ {
+ try
+ {
+ new JMSSelectorFilter("");
+ fail("Should not be able to create a JMSSelectorFilter with an empty selector");
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // pass
+ }
+ }
+
+ public void testNullSelectorFilter() throws Exception
+ {
+ try
+ {
+ new JMSSelectorFilter(null);
+ fail("Should not be able to create a JMSSelectorFilter with a null selector");
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // pass
+ }
+ }
+
+ public void testInvalidSelectorFilter() throws Exception
+ {
+ try
+ {
+ new JMSSelectorFilter("$%^");
+ fail("Unparsable selector so expected AMQInternalException to be thrown");
+ }
+ catch (AMQInternalException amqie)
+ {
+ // pass
+ }
+ }
+
+ public void testSimpleSelectorFilter() throws Exception
+ {
+ MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select=5");
+
+ assertNotNull("Filter object is null", simpleSelectorFilter);
+ assertNotNull("Selector string is null", simpleSelectorFilter.getSelector());
+ assertEquals("Unexpected selector", "select=5", simpleSelectorFilter.getSelector());
+ assertTrue("Filter object is invalid", simpleSelectorFilter != null);
+
+ final JMSTextMessage message = TestMessageHelper.newJMSTextMessage();
+
+ message.setIntProperty("select", 4);
+ assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message));
+ message.setIntProperty("select", 5);
+ assertTrue("Selector didnt match when expected", simpleSelectorFilter.matches(message));
+ message.setIntProperty("select", 6);
+ assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message));
+ }
+
+ public void testFailedMatchingFilter() throws Exception
+ {
+ MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select>4");
+
+ assertNotNull("Filter object is null", simpleSelectorFilter);
+ assertNotNull("Selector string is null", simpleSelectorFilter.getSelector());
+ assertEquals("Unexpected selector", "select>4", simpleSelectorFilter.getSelector());
+ assertTrue("Filter object is invalid", simpleSelectorFilter != null);
+
+ final JMSTextMessage message = TestMessageHelper.newJMSTextMessage();
+
+ message.setStringProperty("select", "5");
+ assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message));
+ message.setStringProperty("select", "elephant");
+ assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message));
+ message.setBooleanProperty("select", false);
+ assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message));
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
index b5e7ae82b5..cd18b5181f 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
@@ -47,12 +47,17 @@ public class MessageConverterTest extends TestCase
protected JMSTextMessage testTextMessage;
protected JMSMapMessage testMapMessage;
- private AMQSession _session = new TestAMQSession();
+ private AMQConnection _connection;
+ private AMQSession _session;
protected void setUp() throws Exception
{
super.setUp();
+
+ _connection = new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'");
+ _session = new TestAMQSession(_connection);
+
testTextMessage = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
//Set Message Text
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index 6759b43387..06d0f4a3f9 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -29,22 +29,25 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageConsumer_0_8;
import org.apache.qpid.client.BasicMessageProducer_0_8;
+import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
- public TestAMQSession()
+ public TestAMQSession(AMQConnection connection)
{
- super(null, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
+ super(connection, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
}
public void acknowledgeMessage(long deliveryTag, boolean multiple)
@@ -124,7 +127,7 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return false;
}
- public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException
+ public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException
{
}
diff --git a/qpid/java/common.xml b/qpid/java/common.xml
index c6688ee2de..26537e7dfc 100644
--- a/qpid/java/common.xml
+++ b/qpid/java/common.xml
@@ -23,7 +23,7 @@
<dirname property="project.root" file="${ant.file.common}"/>
<property name="project.name" value="qpid"/>
- <property name="project.version" value="0.13"/>
+ <property name="project.version" value="0.15"/>
<property name="project.url" value="http://qpid.apache.org"/>
<property name="project.groupid" value="org.apache.qpid"/>
<property name="project.namever" value="${project.name}-${project.version}"/>
diff --git a/qpid/java/common/src/main/java/common.bnd b/qpid/java/common/src/main/java/common.bnd
index f12fbf9273..64e80c9b43 100755
--- a/qpid/java/common/src/main/java/common.bnd
+++ b/qpid/java/common/src/main/java/common.bnd
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.13.0
+ver: 0.15.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
new file mode 100644
index 0000000000..aa262bdde5
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.common;
+
+/**
+ * Keys names used within the serverProperties argument of the ConnectionStart
+ * method. These property names are Qpid specific.
+ */
+public final class ServerPropertyNames
+{
+ /**
+ * Server property: federation tag UUID
+ */
+ public static final String FEDERATION_TAG = "qpid.federation_tag";
+
+ /**
+ * Server property: array of features supported by the server.
+ */
+ public static final String QPID_FEATURES = "qpid.features";
+
+ /**
+ * Feature: Signifies that a server supports JMS selectors.
+ */
+ public static final String FEATURE_QPID_JMS_SELECTOR = "qpid.jms-selector";
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 721c821bab..4a126b8504 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -832,9 +832,12 @@ public class FieldTable
public void addAll(FieldTable fieldTable)
{
initMapIfNecessary();
- _encodedForm = null;
- _properties.putAll(fieldTable._properties);
- recalculateEncodedSize();
+ if (fieldTable._properties != null)
+ {
+ _encodedForm = null;
+ _properties.putAll(fieldTable._properties);
+ recalculateEncodedSize();
+ }
}
public static Map<String, Object> convertToMap(final FieldTable fieldTable)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 1c521244d0..b78433052c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -526,10 +526,6 @@ public class Connection extends ConnectionInvoker
{
synchronized (lock)
{
- for (Session ssn : channels.values())
- {
- ssn.closeCode(close);
- }
ConnectionCloseCode code = close.getReplyCode();
if (code != ConnectionCloseCode.NORMAL)
{
@@ -701,8 +697,17 @@ public class Connection extends ConnectionInvoker
return channels.values();
}
- public boolean hasSessionWithName(final String name)
+ public boolean hasSessionWithName(final byte[] name)
{
- return sessions.containsKey(new Binary(name.getBytes()));
+ return sessions.containsKey(new Binary(name));
+ }
+
+ public void notifyFailoverRequired()
+ {
+ List<Session> values = new ArrayList<Session>(channels.values());
+ for (Session ssn : values)
+ {
+ ssn.notifyFailoverRequired();
+ }
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 82fa6ca473..07d21c9904 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -195,10 +195,17 @@ public class ServerDelegate extends ConnectionDelegate
@Override
public void sessionAttach(Connection conn, SessionAttach atc)
{
+ sessionAttachImpl(conn, atc);
+ }
+
+ protected Session sessionAttachImpl(Connection conn, SessionAttach atc)
+ {
Session ssn = getSession(conn, atc);
conn.map(ssn, atc.getChannel());
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
+
+ return ssn;
}
protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 0de558d152..321e5256b2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Session
@@ -125,6 +126,8 @@ public class Session extends SessionInvoker
private SessionDetachCode detachCode;
private final Object stateLock = new Object();
+ private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -257,6 +260,7 @@ public class Session extends SessionInvoker
void resume()
{
+ _failoverRequired.set(false);
synchronized (commands)
{
attach();
@@ -459,7 +463,7 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- if (state == DETACHED || state == CLOSING)
+ if (state == DETACHED || state == CLOSING || state == CLOSED)
{
return;
}
@@ -583,30 +587,25 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- //allow the txSelect operation to be invoked during resume
- boolean skipWait = m instanceof TxSelect && state == RESUMING;
-
- if(!skipWait)
+ if (state == DETACHED && m.isUnreliable())
{
- if (state == DETACHED && m.isUnreliable())
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
- {
- return;
- }
+ return;
}
+ }
- if (state != OPEN && state != CLOSED && state != CLOSING)
+ if (state != OPEN && state != CLOSED && state != CLOSING)
+ {
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer) )
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
- {
- w.await();
- }
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
+ w.await();
}
}
}
@@ -674,6 +673,7 @@ public class Session extends SessionInvoker
}
}
}
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
w.await();
}
}
@@ -768,6 +768,14 @@ public class Session extends SessionInvoker
}
}
+ private void checkFailoverRequired(String message)
+ {
+ if (_failoverRequired.get())
+ {
+ throw new SessionException(message);
+ }
+ }
+
protected boolean shouldIssueFlush(int next)
{
return (next % 65536) == 0;
@@ -793,6 +801,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
+ checkFailoverRequired("Session sync was interrupted by failover.");
log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
w.await();
}
@@ -853,13 +862,6 @@ public class Session extends SessionInvoker
}
}
- private ConnectionClose close = null;
-
- void closeCode(ConnectionClose close)
- {
- this.close = close;
- }
-
ExecutionException getException()
{
synchronized (results)
@@ -910,6 +912,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(this, timeout);
while (w.hasTime() && state != CLOSED && !isDone())
{
+ checkFailoverRequired("Operation was interrupted by failover.");
log.debug("%s waiting for result: %s", Session.this, this);
w.await();
}
@@ -921,7 +924,12 @@ public class Session extends SessionInvoker
}
else if (state == CLOSED)
{
- throw new SessionException(getException());
+ ExecutionException ex = getException();
+ if(ex == null)
+ {
+ throw new SessionClosedException();
+ }
+ throw new SessionException(ex);
}
else
{
@@ -1001,6 +1009,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED)
{
+ checkFailoverRequired("close() was interrupted by failover.");
w.await();
}
@@ -1095,6 +1104,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(stateLock, timeout);
while (w.hasTime() && state == NEW)
{
+ checkFailoverRequired("Session opening was interrupted by failover.");
w.await();
}
}
@@ -1117,4 +1127,26 @@ public class Session extends SessionInvoker
{
return stateLock;
}
+
+ protected void notifyFailoverRequired()
+ {
+ //ensure any operations waiting are aborted to
+ //prevent them waiting for timeout for 60 seconds
+ //and possibly preventing failover proceeding
+ _failoverRequired.set(true);
+ synchronized (commands)
+ {
+ commands.notifyAll();
+ }
+ synchronized (results)
+ {
+ for (ResultFuture<?> result : results.values())
+ {
+ synchronized(result)
+ {
+ result.notifyAll();
+ }
+ }
+ }
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
index a6a8b8beb4..fe1a300479 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
@@ -257,4 +257,19 @@ public final class Strings
return join(sep, Arrays.asList(items));
}
+ public static String printMap(Map<String,Object> map)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("<");
+ if (map != null)
+ {
+ for(String key : map.keySet())
+ {
+ sb.append(key).append(" = ").append(map.get(key)).append(" ");
+ }
+ }
+ sb.append(">");
+ return sb.toString();
+ }
+
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
index bb4c9c3884..bd189feb1c 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
@@ -20,17 +20,19 @@
*/
package org.apache.qpid.framing;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQPInvalidClassException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
-
public class PropertyFieldTableTest extends TestCase
{
private static final Logger _logger = LoggerFactory.getLogger(PropertyFieldTableTest.class);
@@ -106,7 +108,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setByte("value", Byte.MAX_VALUE);
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getByte("value"));
@@ -139,7 +141,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setShort("value", Short.MAX_VALUE);
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -172,7 +174,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setChar("value", 'c');
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -206,7 +208,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setDouble("value", Double.MAX_VALUE);
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -241,7 +243,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setFloat("value", Float.MAX_VALUE);
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Tests lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -404,7 +406,7 @@ public class PropertyFieldTableTest extends TestCase
table1.setString("value", "Hello");
Assert.assertTrue(table1.propertyExists("value"));
- // Tets lookups we shouldn't get anything back for other gets
+ // Test lookups we shouldn't get anything back for other gets
// we should get right value back for this type ....
Assert.assertEquals(null, table1.getBoolean("value"));
Assert.assertEquals(null, table1.getByte("value"));
@@ -569,7 +571,7 @@ public class PropertyFieldTableTest extends TestCase
Assert.assertEquals("Hello", table.getObject("object-string"));
}
- public void testwriteBuffer() throws IOException
+ public void testWriteBuffer() throws IOException
{
byte[] bytes = { 99, 98, 97, 96, 95 };
@@ -950,6 +952,36 @@ public class PropertyFieldTableTest extends TestCase
}
+ public void testAddAll()
+ {
+ final FieldTable table1 = new FieldTable();
+ table1.setInteger("int1", 1);
+ table1.setInteger("int2", 2);
+ assertEquals("Unexpected number of entries in table1", 2, table1.size());
+
+ final FieldTable table2 = new FieldTable();
+ table2.setInteger("int3", 3);
+ table2.setInteger("int4", 4);
+ assertEquals("Unexpected number of entries in table2", 2, table2.size());
+
+ table1.addAll(table2);
+ assertEquals("Unexpected number of entries in table1 after addAll", 4, table1.size());
+ assertEquals(Integer.valueOf(3), table1.getInteger("int3"));
+ }
+
+ public void testAddAllWithEmptyFieldTable()
+ {
+ final FieldTable table1 = new FieldTable();
+ table1.setInteger("int1", 1);
+ table1.setInteger("int2", 2);
+ assertEquals("Unexpected number of entries in table1", 2, table1.size());
+
+ final FieldTable emptyFieldTable = new FieldTable();
+
+ table1.addAll(emptyFieldTable);
+ assertEquals("Unexpected number of entries in table1 after addAll", 2, table1.size());
+ }
+
private void assertBytesEqual(byte[] expected, byte[] actual)
{
Assert.assertEquals(expected.length, actual.length);
diff --git a/qpid/java/ivy.xml b/qpid/java/ivy.xml
index 1399db5248..899c5fa79b 100644
--- a/qpid/java/ivy.xml
+++ b/qpid/java/ivy.xml
@@ -18,7 +18,7 @@
<ivy-module version="2.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://ant.apache.org/ivy/schemas/ivy.xsd">
- <info organisation="org/apache" module="qpid" revision="0.13"/>
+ <info organisation="org/apache" module="qpid" revision="0.15"/>
<publications xmlns:e="urn:ant.apache.org:ivy-extras">
<artifact name="qpid-client" type="pom" ext="pom"/>
@@ -33,6 +33,24 @@
<artifact name="qpid-common" type="jar.asc" ext="jar.asc"/>
<artifact name="qpid-common" type="source" ext="jar" e:classifier="sources"/>
<artifact name="qpid-common" type="source.asc" ext="jar.asc" e:classifier="sources"/>
+ <artifact name="qpid-broker" type="pom" ext="pom"/>
+ <artifact name="qpid-broker" type="pom.asc" ext="pom.asc"/>
+ <artifact name="qpid-broker" type="jar" ext="jar"/>
+ <artifact name="qpid-broker" type="jar.asc" ext="jar.asc"/>
+ <artifact name="qpid-broker" type="source" ext="jar" e:classifier="sources"/>
+ <artifact name="qpid-broker" type="source.asc" ext="jar.asc" e:classifier="sources"/>
+ <artifact name="qpid-management-common" type="pom" ext="pom"/>
+ <artifact name="qpid-management-common" type="pom.asc" ext="pom.asc"/>
+ <artifact name="qpid-management-common" type="jar" ext="jar"/>
+ <artifact name="qpid-management-common" type="jar.asc" ext="jar.asc"/>
+ <artifact name="qpid-management-common" type="source" ext="jar" e:classifier="sources"/>
+ <artifact name="qpid-management-common" type="source.asc" ext="jar.asc" e:classifier="sources"/>
+ <artifact name="qpid-bdbstore" type="pom" ext="pom"/>
+ <artifact name="qpid-bdbstore" type="pom.asc" ext="pom.asc"/>
+ <artifact name="qpid-bdbstore" type="jar" ext="jar"/>
+ <artifact name="qpid-bdbstore" type="jar.asc" ext="jar.asc"/>
+ <artifact name="qpid-bdbstore" type="source" ext="jar" e:classifier="sources"/>
+ <artifact name="qpid-bdbstore" type="source.asc" ext="jar.asc" e:classifier="sources"/>
</publications>
<dependencies/>
diff --git a/qpid/java/lib/poms/je-4.0.117.xml b/qpid/java/lib/poms/je-4.0.117.xml
new file mode 100644
index 0000000000..85573a99f9
--- /dev/null
+++ b/qpid/java/lib/poms/je-4.0.117.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<dep>
+ <groupId>com.sleepycat</groupId>
+ <artifactId>je</artifactId>
+ <version>4.0.117</version>
+</dep>
diff --git a/qpid/java/management/common/build.xml b/qpid/java/management/common/build.xml
index ce2ec3a106..1a0520b1e3 100644
--- a/qpid/java/management/common/build.xml
+++ b/qpid/java/management/common/build.xml
@@ -19,8 +19,9 @@
-
-->
<project name="Management Common" default="build">
-
<import file="../../module.xml"/>
+ <property name="module.genpom" value="true"/>
+
<target name="bundle" depends="bundle-tasks"/>
</project>
diff --git a/qpid/java/management/common/src/main/java/management-common.bnd b/qpid/java/management/common/src/main/java/management-common.bnd
index 5c39329f2f..5f7f8bede3 100644
--- a/qpid/java/management/common/src/main/java/management-common.bnd
+++ b/qpid/java/management/common/src/main/java/management-common.bnd
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.13.0
+ver: 0.15.0
Bundle-SymbolicName: qpid-management-common
Bundle-Version: ${ver}
diff --git a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF
index 0df308dff1..f47bc86797 100644
--- a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF
+++ b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF
@@ -3,7 +3,7 @@ Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
Bundle-ManifestVersion: 2
Bundle-Name: Qpid JMX Management Console Plug-in
Bundle-SymbolicName: org.apache.qpid.management.ui; singleton:=true
-Bundle-Version: 0.13.0
+Bundle-Version: 0.15.0
Bundle-Activator: org.apache.qpid.management.ui.Activator
Bundle-Vendor: Apache Software Foundation
Bundle-Localization: plugin
diff --git a/qpid/java/systests/etc/config-systests-firewall-2.xml b/qpid/java/systests/etc/config-systests-firewall-2.xml
index 2549a7e6c4..a9fd86b8e5 100644
--- a/qpid/java/systests/etc/config-systests-firewall-2.xml
+++ b/qpid/java/systests/etc/config-systests-firewall-2.xml
@@ -32,8 +32,8 @@
<ssl>
<enabled>false</enabled>
<sslOnly>false</sslOnly>
- <keystorePath>/path/to/keystore.ks</keystorePath>
- <keystorePassword>keystorepass</keystorePassword>
+ <keyStorePath>/path/to/keystore.ks</keyStorePath>
+ <keyStorePassword>keystorepass</keyStorePassword>
</ssl>
<port>5672</port>
<sslport>8672</sslport>
diff --git a/qpid/java/systests/etc/config-systests-firewall-3.xml b/qpid/java/systests/etc/config-systests-firewall-3.xml
index 0cafb6d70a..f0f3423f43 100644
--- a/qpid/java/systests/etc/config-systests-firewall-3.xml
+++ b/qpid/java/systests/etc/config-systests-firewall-3.xml
@@ -28,12 +28,12 @@
<connector>
<!-- To enable SSL edit the keystorePath and keystorePassword
and set enabled to true.
- To disasble Non-SSL port set sslOnly to true -->
+ To disable Non-SSL port set sslOnly to true -->
<ssl>
<enabled>false</enabled>
<sslOnly>false</sslOnly>
- <keystorePath>/path/to/keystore.ks</keystorePath>
- <keystorePassword>keystorepass</keystorePassword>
+ <keyStorePath>/path/to/keystore.ks</keyStorePath>
+ <keyStorePassword>keystorepass</keyStorePassword>
</ssl>
<port>5672</port>
<sslport>8672</sslport>
diff --git a/qpid/java/systests/etc/config-systests-settings.xml b/qpid/java/systests/etc/config-systests-settings.xml
index 5ed208bfe7..88533400d3 100644
--- a/qpid/java/systests/etc/config-systests-settings.xml
+++ b/qpid/java/systests/etc/config-systests-settings.xml
@@ -25,8 +25,8 @@
<port>15671</port>
<enabled>false</enabled>
<sslOnly>false</sslOnly>
- <keystorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks</keystorePath>
- <keystorePassword>password</keystorePassword>
+ <keyStorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks</keyStorePath>
+ <keyStorePassword>password</keyStorePassword>
</ssl>
</connector>
<management>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
index 986297bfe1..7ea4416f3b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
@@ -35,13 +35,11 @@ import org.slf4j.LoggerFactory;
public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase
{
-
- private static final int NUM_MESSAGES = 1000;
-
private Connection con;
private Session session;
private AMQQueue queue;
private MessageConsumer consumer;
+ private int numMessages;
private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
@@ -87,6 +85,8 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase
{
super.setUp();
+ numMessages = isBrokerStorePersistent() ? 300 : 1000;
+
_logger.info("Create Connection");
con = getConnection();
_logger.info("Create Session");
@@ -105,19 +105,19 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase
// Setup initial messages
_logger.info("Creating first producer thread");
- producerThread = new ASyncProducer(queue, 0, NUM_MESSAGES / 2);
+ producerThread = new ASyncProducer(queue, 0, numMessages / 2);
producerThread.start();
// Wait for them to be done
producerThread.join();
// Setup second set of messages to produce while we consume
_logger.info("Creating second producer thread");
- producerThread = new ASyncProducer(queue, NUM_MESSAGES / 2, NUM_MESSAGES);
+ producerThread = new ASyncProducer(queue, numMessages / 2, numMessages);
producerThread.start();
// Start consuming and checking they're in order
_logger.info("Consuming messages");
- for (int i = 0; i < NUM_MESSAGES; i++)
+ for (int i = 0; i < numMessages; i++)
{
Message msg = consumer.receive(3000);
assertNotNull("Message should not be null", msg);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java
index 303da29389..40a0d32b01 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java
@@ -135,7 +135,7 @@ public class ResetMessageListenerTest extends QpidBrokerTestCase
try
{
assertTrue("Did not receive all first batch of messages",
- _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS));
+ _allFirstMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS));
_logger.info("Received first batch of messages");
}
catch (InterruptedException e)
@@ -212,7 +212,7 @@ public class ResetMessageListenerTest extends QpidBrokerTestCase
try
{
- assertTrue(_allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS));
+ assertTrue(_allSecondMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
new file mode 100644
index 0000000000..980fc7285d
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
@@ -0,0 +1,14 @@
+package org.apache.qpid.client.failover;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public class AddressBasedFailoverBehaviourTest extends FailoverBehaviourTest
+{
+ @Override
+ protected Destination createDestination(Session session) throws JMSException
+ {
+ return session.createQueue("ADDR:" +getTestQueueName() + "_" + System.currentTimeMillis() + "; {create: always}");
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
index 66f8fe0546..a5b9c618bc 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
@@ -20,6 +20,7 @@ package org.apache.qpid.client.failover;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -33,12 +34,15 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
/**
@@ -96,6 +100,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
*/
private JMSException _exceptionListenerException;
+ /**
+ * Latch to check that failover mutex is hold by a failover thread
+ */
+ private CountDownLatch _failoverStarted;
+
@Override
protected void setUp() throws Exception
{
@@ -105,6 +114,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
_connection.setExceptionListener(this);
((AMQConnection) _connection).setConnectionListener(this);
_failoverComplete = new CountDownLatch(1);
+ _failoverStarted = new CountDownLatch(1);
}
/**
@@ -625,8 +635,134 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
+ public void testPublishAutoAcknowledgedWhileFailover() throws Exception
+ {
+ publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testPublishClientAcknowledgedWhileFailover() throws Exception
+ {
+ Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
+ receivedMessage.acknowledge();
+ }
+
+ public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
+ {
+ publishWhileFailingOver(Session.SESSION_TRANSACTED);
+ _consumerSession.commit();
+ }
+
+ public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
+
+ }
+
+ public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.SESSION_TRANSACTED);
+ }
+
+ public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testTransactedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+ }
+
+ public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+ }
+
+ public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
+ {
+ setDelayedFailoverPolicy(5);
+ init(autoAcknowledge, true);
+
+ String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+ Message message = _producerSession.createTextMessage(text);
+
+ failBroker(getFailingPort());
+
+ if(!_failoverStarted.await(5, TimeUnit.SECONDS))
+ {
+ fail("Did not receieve notification failover had started");
+ }
+
+ _producer.send(message);
+
+ if (_producerSession.getTransacted())
+ {
+ _producerSession.commit();
+ }
+
+ Message receivedMessage = _consumer.receive(1000l);
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+ return receivedMessage;
+ }
+
+ private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
+ {
+ setDelayedFailoverPolicy(5);
+ init(autoAcknowledge, true);
+
+ String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+ Message message = _producerSession.createTextMessage(text);
+
+ AMQConnection connection = (AMQConnection)_connection;
+
+ // holding failover mutex should prevent the failover from
+ // proceeding before we try to send the message
+ synchronized(connection.getFailoverMutex())
+ {
+ failBroker(getFailingPort());
+
+ // wait to make sure that connection is lost
+ while(!connection.isFailingOver())
+ {
+ Thread.sleep(25l);
+ }
+
+ try
+ {
+ _producer.send(message);
+ fail("Sending should fail because connection was lost and failover has not yet completed");
+ }
+ catch(JMSException e)
+ {
+ // JMSException is expected
+ }
+ }
+ // wait for failover completion, thus ensuring it actually
+ //got started, before allowing the test to tear down
+ awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+ }
/**
- * Tests {@link Session#close()} for session with given acknowledge mode
+ * Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that close works after failover.
*
* @param acknowledgeMode session acknowledge mode
@@ -671,7 +807,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
_consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+ _destination = createDestination(_consumerSession);
_consumer = _consumerSession.createConsumer(_destination);
if (startConnection)
@@ -684,6 +820,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
}
+ protected Destination createDestination(Session session) throws JMSException
+ {
+ return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+ }
+
/**
* Resends messages if reconnected to a non-clustered broker
*
@@ -879,6 +1020,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
@Override
public boolean preFailover(boolean redirect)
{
+ _failoverStarted.countDown();
return true;
}
@@ -900,6 +1042,39 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
_exceptionListenerException = e;
}
+ /**
+ * Causes 1 second delay before reconnect in order to test whether JMS
+ * methods block while failover is in progress
+ */
+ private static class DelayingFailoverPolicy extends FailoverPolicy
+ {
+
+ private CountDownLatch _suspendLatch;
+ private long _delay;
+
+ public DelayingFailoverPolicy(AMQConnection connection, long delay)
+ {
+ super(connection.getConnectionURL(), connection);
+ _suspendLatch = new CountDownLatch(1);
+ _delay = delay;
+ }
+
+ public void attainedConnection()
+ {
+ try
+ {
+ _suspendLatch.await(_delay, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // continue
+ }
+ super.attainedConnection();
+ }
+
+ }
+
+
private class FailoverTestMessageListener implements MessageListener
{
// message counter
@@ -946,4 +1121,101 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
return _counter.get();
}
}
+
+ /**
+ * Tests {@link Session#close()} for session with given acknowledge mode
+ * to ensure that it blocks until failover implementation restores connection.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @throws JMSException
+ */
+ private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+ {
+ initDelayedFailover(acknowledgeMode);
+
+ // intentionally receive message but not commit or acknowledge it in
+ // case of transacted or CLIENT_ACK session
+ Message receivedMessage = _consumer.receive(1000l);
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+ failBroker(getFailingPort());
+
+ // test whether session#close blocks while failover is in progress
+ _consumerSession.close();
+
+ assertFailoverException();
+ }
+
+ /**
+ * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @return queue browser
+ * @throws JMSException
+ */
+ private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException
+ {
+ init(acknowledgeMode, false);
+ _consumer.close();
+ QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
+ _connection.start();
+
+ produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+ if (acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _producerSession.commit();
+ }
+ return browser;
+ }
+
+ /**
+ * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
+ * to ensure that it blocks until failover implementation restores connection.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @throws JMSException
+ */
+ private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+ {
+ setDelayedFailoverPolicy();
+
+ QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
+
+ @SuppressWarnings("unchecked")
+ Enumeration<Message> messages = browser.getEnumeration();
+ Message receivedMessage = (Message) messages.nextElement();
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+ failBroker(getFailingPort());
+
+ browser.close();
+
+ assertFailoverException();
+ }
+
+ private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
+ {
+ DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
+ init(acknowledgeMode, true);
+ produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+ if (acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _producerSession.commit();
+ }
+ return failoverPolicy;
+ }
+
+ private DelayingFailoverPolicy setDelayedFailoverPolicy()
+ {
+ return setDelayedFailoverPolicy(2);
+ }
+
+ private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
+ {
+ AMQConnection amqConnection = (AMQConnection) _connection;
+ DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
+ ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
+ return failoverPolicy;
+ }
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
new file mode 100644
index 0000000000..c0b07f239b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
@@ -0,0 +1,116 @@
+package org.apache.qpid.client.prefetch;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrefetchBehaviourTest extends QpidBrokerTestCase
+{
+ protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class);
+ private Connection _normalConnection;
+ private AtomicBoolean _exceptionCaught;
+ private CountDownLatch _processingStarted;
+ private CountDownLatch _processingCompleted;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _normalConnection = getConnection();
+ _exceptionCaught = new AtomicBoolean();
+ _processingStarted = new CountDownLatch(1);
+ _processingCompleted = new CountDownLatch(1);
+ }
+
+ /**
+ * Verifies that a slow processing asynchronous transacted consumer with prefetch=1 only
+ * gets 1 of the messages sent, with the second consumer picking up the others while the
+ * slow consumer is processing, i.e that prefetch=1 actually does what it says on the tin.
+ */
+ public void testPrefetchOneWithAsynchronousTransactedConsumer() throws Exception
+ {
+ final long processingTime = 5000;
+
+ //create a second connection with prefetch set to 1
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
+ Connection prefetch1Connection = getConnection();
+
+ prefetch1Connection.start();
+ _normalConnection.start();
+
+ //create an asynchronous consumer with simulated slow processing
+ final Session prefetch1session = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = prefetch1session.createQueue(getTestQueueName());
+ MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue);
+ prefetch1consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ _logger.debug("starting processing");
+ _processingStarted.countDown();
+ _logger.debug("processing started");
+
+ //simulate message processing
+ Thread.sleep(processingTime);
+
+ prefetch1session.commit();
+
+ _processingCompleted.countDown();
+ }
+ catch(Exception e)
+ {
+ _logger.error("Exception caught in message listener");
+ _exceptionCaught.set(true);
+ }
+ }
+ });
+
+ //create producer and send 5 messages
+ Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int i = 0; i < 5; i++)
+ {
+ producer.send(producerSession.createTextMessage("test"));
+ }
+ producerSession.commit();
+
+ //wait for the first message to start being processed by the async consumer
+ assertTrue("Async processing failed to start in allowed timeframe", _processingStarted.await(2000, TimeUnit.MILLISECONDS));
+ _logger.debug("proceeding with test");
+
+ //try to consumer the other messages with another consumer while the async procesisng occurs
+ Session normalSession = _normalConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer normalConsumer = normalSession.createConsumer(queue);
+
+ Message msg;
+ // Check that other consumer gets the other 4 messages
+ for (int i = 0; i < 4; i++)
+ {
+ msg = normalConsumer.receive(1500);
+ assertNotNull("Consumer should receive 4 messages",msg);
+ }
+ msg = normalConsumer.receive(250);
+ assertNull("Consumer should not have received a 5th message",msg);
+
+ //wait for the other consumer to finish to ensure it completes ok
+ _logger.debug("waiting for async consumer to complete");
+ assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS));
+ assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get());
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
index 460270e188..277e84d66d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
@@ -122,7 +122,8 @@ public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase
producer2.start();
//await delivery of the messages
- boolean result = _receivedLatch.await(75, TimeUnit.SECONDS);
+ int timeout = isBrokerStorePersistent() ? 300 : 75;
+ boolean result = _receivedLatch.await(timeout, TimeUnit.SECONDS);
assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg);
assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(),
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
index 74f50e8659..aeeecb2dff 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
@@ -110,18 +110,13 @@ public class QueueDepthWithSelectorTest extends QpidBrokerTestCase
try
{
Connection connection = getConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQSession session = (AMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Thread.sleep(2000);
- long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue);
+ long queueDepth = session.getQueueDepth((AMQDestination) _queue);
assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
connection.close();
}
- catch (InterruptedException e)
- {
- fail(e.getMessage());
- }
catch (AMQException e)
{
fail(e.getMessage());
@@ -158,6 +153,10 @@ public class QueueDepthWithSelectorTest extends QpidBrokerTestCase
{
assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]);
}
+
+ //do a synchronous op to ensure the acks are processed
+ //on the broker before proceeding
+ ((AMQSession)_clientSession).sync();
}
/**
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
index 13a9dd73b8..107c730a7e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
@@ -94,7 +94,7 @@ public class CancelTest extends QpidBrokerTestCase
browser.close();
MessageConsumer consumer = _clientSession.createConsumer(_queue);
- assertNotNull( consumer.receive() );
+ assertNotNull( consumer.receive(2000l) );
consumer.close();
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index d6caf05d33..4eb328f091 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -261,8 +261,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount, String selector) throws JMSException
{
QueueBrowser queueBrowser = selector == null ?
- _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue);
-// _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector);
+ _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector);
Enumeration[] msgs = new Enumeration[browserEnumerationCount];
int[] msgCount = new int[browserEnumerationCount];
@@ -347,7 +346,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
protected void checkQueueDepthWithSelectors(int totalMessages, int clients) throws JMSException
{
- String selector = MESSAGE_ID_PROPERTY + " % " + clients;
+ String selector = MESSAGE_ID_PROPERTY + " % " + clients + " = 0" ;
checkOverlappingMultipleGetEnum(totalMessages / clients, clients, selector);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 8c3c247e2b..feae7c9573 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.test.client.destination;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,11 +18,13 @@ package org.apache.qpid.test.client.destination;
* under the License.
*
*/
-
+package org.apache.qpid.test.client.destination;
import java.util.Collections;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -34,6 +35,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
@@ -475,13 +477,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
prod.send(jmsSession.createTextMessage("msg" + i) );
}
-
- for (int i=0; i< 9; i++)
+ Message msg = null;
+ for (int i=0; i< 10; i++)
{
- cons.receive();
+ msg = cons.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Should have received " + i + " message", msg);
+ assertEquals("Unexpected message received", "msg" + i, ((TextMessage)msg).getText());
}
- Message msg = cons.receive(RECEIVE_TIMEOUT);
- assertNotNull("Should have received the 10th message",msg);
assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
msg.acknowledge();
for (int i=11; i<16; i++)
@@ -1068,19 +1070,6 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported"));
}
-
- String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";
- try
- {
- AMQAnyDestination dest = new AMQAnyDestination(addr4);
- Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons = ssn.createConsumer(dest);
- fail("An exception should be thrown indicating it's an unsupported combination");
- }
- catch(Exception e)
- {
- assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
- }
}
private void acceptModeTest(String address, int expectedQueueDepth) throws Exception
@@ -1182,4 +1171,154 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000));
cons.close();
}
+
+ public void testQueueBrowserWithSelectorAutoAcknowledgement() throws Exception
+ {
+ assertQueueBrowserWithSelector(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testQueueBrowserWithSelectorClientAcknowldgement() throws Exception
+ {
+ assertQueueBrowserWithSelector(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testQueueBrowserWithSelectorTransactedSession() throws Exception
+ {
+ assertQueueBrowserWithSelector(Session.SESSION_TRANSACTED);
+ }
+
+ public void testConsumerWithSelectorAutoAcknowledgement() throws Exception
+ {
+ assertConsumerWithSelector(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testConsumerWithSelectorClientAcknowldgement() throws Exception
+ {
+ assertConsumerWithSelector(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testConsumerWithSelectorTransactedSession() throws Exception
+ {
+ assertConsumerWithSelector(Session.SESSION_TRANSACTED);
+ }
+
+ private void assertQueueBrowserWithSelector(int acknowledgement) throws Exception
+ {
+ String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}";
+
+ boolean transacted = acknowledgement == Session.SESSION_TRANSACTED;
+ Session session = _connection.createSession(transacted, acknowledgement);
+
+ Queue queue = session.createQueue(queueAddress);
+
+ final int numberOfMessages = 10;
+ List<Message> sentMessages = sendMessage(session, queue, numberOfMessages);
+ assertNotNull("Messages were not sent", sentMessages);
+ assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size());
+
+ QueueBrowser browser = session.createBrowser(queue, INDEX + "%2=0");
+ _connection.start();
+
+ Enumeration<Message> enumaration = browser.getEnumeration();
+
+ int counter = 0;
+ int expectedIndex = 0;
+ while (enumaration.hasMoreElements())
+ {
+ Message m = enumaration.nextElement();
+ assertNotNull("Expected not null message at step " + counter, m);
+ int messageIndex = m.getIntProperty(INDEX);
+ assertEquals("Unexpected index", expectedIndex, messageIndex);
+ expectedIndex += 2;
+ counter++;
+ }
+ assertEquals("Unexpected number of messsages received", 5, counter);
+ }
+
+ private void assertConsumerWithSelector(int acknowledgement) throws Exception
+ {
+ String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}";
+
+ boolean transacted = acknowledgement == Session.SESSION_TRANSACTED;
+ Session session = _connection.createSession(transacted, acknowledgement);
+
+ Queue queue = session.createQueue(queueAddress);
+
+ final int numberOfMessages = 10;
+ List<Message> sentMessages = sendMessage(session, queue, numberOfMessages);
+ assertNotNull("Messages were not sent", sentMessages);
+ assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size());
+
+ MessageConsumer consumer = session.createConsumer(queue, INDEX + "%2=0");
+
+ int expectedIndex = 0;
+ for (int i = 0; i < 5; i++)
+ {
+ Message m = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Expected not null message at step " + i, m);
+ int messageIndex = m.getIntProperty(INDEX);
+ assertEquals("Unexpected index", expectedIndex, messageIndex);
+ expectedIndex += 2;
+
+ if (transacted)
+ {
+ session.commit();
+ }
+ else if (acknowledgement == Session.CLIENT_ACKNOWLEDGE)
+ {
+ m.acknowledge();
+ }
+ }
+
+ Message m = consumer.receive(RECEIVE_TIMEOUT);
+ assertNull("Unexpected message received", m);
+ }
+
+ /**
+ * Tests that a client using a session in {@link Session#CLIENT_ACKNOWLEDGE} can correctly
+ * recover a session and re-receive the same message.
+ */
+ public void testTopicRereceiveAfterRecover() throws Exception
+ {
+ final Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}");
+
+ final MessageProducer prod = jmsSession.createProducer(topic);
+ final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic);
+ final Message sentMessage = jmsSession.createTextMessage("Hello");
+
+ prod.send(sentMessage);
+ Message receivedMessage = consForTopic1.receive(1000);
+ assertNotNull("message should be received by consumer", receivedMessage);
+
+ jmsSession.recover();
+ receivedMessage = consForTopic1.receive(1000);
+ assertNotNull("message should be re-received by consumer after recover", receivedMessage);
+ receivedMessage.acknowledge();
+ }
+
+ /**
+ * Tests that a client using a session in {@link Session#SESSION_TRANSACTED} can correctly
+ * rollback a session and re-receive the same message.
+ */
+ public void testTopicRereceiveAfterRollback() throws Exception
+ {
+ final Session jmsSession = _connection.createSession(true,Session.SESSION_TRANSACTED);
+ final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}");
+
+ final MessageProducer prod = jmsSession.createProducer(topic);
+ final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic);
+ final Message sentMessage = jmsSession.createTextMessage("Hello");
+
+ prod.send(sentMessage);
+ jmsSession.commit();
+
+ Message receivedMessage = consForTopic1.receive(1000);
+ assertNotNull("message should be received by consumer", receivedMessage);
+
+ jmsSession.rollback();
+ receivedMessage = consForTopic1.receive(1000);
+ assertNotNull("message should be re-received by consumer after rollback", receivedMessage);
+ jmsSession.commit();
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
index 53a7533869..57ff6a4fa2 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -205,7 +205,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase
Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumerA = consSessA.createConsumer(_queue);
- Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = producerSession.createProducer(_queue);
// Send 3 messages
@@ -213,6 +213,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase
{
producer.send(producerSession.createTextMessage("test"));
}
+ producerSession.commit();
MessageConsumer consumerB = null;
// 0-8, 0-9, 0-9-1 prefetch is per session, not consumer.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index d799b141c0..9ea116ae53 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -262,7 +262,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(500);
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 1 should get message 'B'.", msg);
assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
@@ -287,13 +287,13 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
else
{
_logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(500);
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 3 should get message 'B'.", msg);
assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
}
_logger.info("Receive message on consumer 1 :expecting C");
- msg = consumer1.receive(500);
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 1 should get message 'C'.", msg);
assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
@@ -301,7 +301,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
assertNull("There should be no more messages for consumption on consumer1.", msg);
_logger.info("Receive message on consumer 3 :expecting C");
- msg = consumer3.receive(500);
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 3 should get message 'C'.", msg);
assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
@@ -358,7 +358,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
// Send message and check that both consumers get it and only it.
producer.send(session0.createTextMessage("A"));
- msg = consumer1.receive(500);
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should be available", msg);
assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
msg = consumer1.receive(500);
@@ -729,7 +729,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
conn.start();
- Message rMsg = subB.receive(1000);
+ Message rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart2",
@@ -797,7 +797,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
conn.start();
- Message rMsg = subTwo.receive(1000);
+ Message rMsg = subTwo.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart1",
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 0b1aeef8e9..826545a23d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -20,17 +20,22 @@
*/
package org.apache.qpid.test.unit.topic;
+import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQTopicSessionAdaptor;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -306,51 +311,39 @@ public class TopicSessionTest extends QpidBrokerTestCase
}
/**
- * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
- * due to a selector can be leaked.
- * @throws Exception
+ * This tests was added to demonstrate QPID-3542. The Java Client when used with the CPP Broker was failing to
+ * ack messages received that did not match the selector. This meant the messages remained indefinitely on the Broker.
*/
- public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+ public void testNonMatchingMessagesHandledCorrectly() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- // Setup Topic
- AMQTopic topic = new AMQTopic(con, "testNoLocal");
-
- TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+ final String topicName = getName();
+ final String clientId = "clientId" + topicName;
+ final Connection con1 = getConnection();
+ final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Topic topic1 = session1.createTopic(topicName);
+ final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId);
// Setup subscriber with selector
- TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false);
- TopicPublisher publisher = session.createPublisher(topic);
+ final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
+ final MessageProducer publisher = session1.createProducer(topic1);
- con.start();
- TextMessage m;
- TextMessage message;
+ con1.start();
// Send non-matching message
- message = session.createTextMessage("non-matching 1");
- publisher.publish(message);
- session.commit();
+ final Message sentMessage = session1.createTextMessage("hello");
+ sentMessage.setStringProperty("Selector", "nonMatch");
+ publisher.send(sentMessage);
- // Send and consume matching message
- message = session.createTextMessage("hello");
- message.setStringProperty("Selector", "select");
-
- publisher.publish(message);
- session.commit();
-
- m = (TextMessage) selector.receive(1000);
- assertNotNull("should have received message", m);
- assertEquals("Message contents were wrong", "hello", m.getText());
-
- // Send non-matching message
- message = session.createTextMessage("non-matching 2");
- publisher.publish(message);
- session.commit();
+ // Try to consume non-message, expect this to fail.
+ final Message message1 = subscriberWithSelector.receive(1000);
+ assertNull("should not have received message", message1);
+ subscriberWithSelector.close();
- // Assert queue count is 0
- long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
- assertEquals("Queue depth was wrong", 0, depth);
+ session1.close();
+ // Now verify queue depth on broker.
+ final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker);
+ assertEquals("Expected queue depth of zero", 0, depth);
}
}
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index d05f42e4b7..39b4d542db 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -64,9 +64,6 @@ org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges
// 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test
org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
-// c++ broker doesn't do selectors, so this will fail
-org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue
-
// InVM Broker tests
org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes
index 8344cd98c2..68feaf1e2b 100644
--- a/qpid/java/test-profiles/JavaPre010Excludes
+++ b/qpid/java/test-profiles/JavaPre010Excludes
@@ -28,6 +28,7 @@ org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange
// The new addressing based syntax is not supported for AMQP 0-8/0-9 versions
+org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#*
org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
diff --git a/qpid/java/test-profiles/python_tests/Java010PythonExcludes b/qpid/java/test-profiles/python_tests/Java010PythonExcludes
index 31d2a8affc..10e6298634 100644
--- a/qpid/java/test-profiles/python_tests/Java010PythonExcludes
+++ b/qpid/java/test-profiles/python_tests/Java010PythonExcludes
@@ -1,22 +1,115 @@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-
-// QPID-3477: Java broker does not handle rejection code specified in test
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###### Feature not supported in Java Broker ######
+
+#The broker does not support DTX
+qpid_tests.broker_0_10.dtx.*
+
+#The broker does not support message groups
+qpid_tests.broker_0_10.msg_groups.*
+
+#The broker does not have the appropriate QMF support
+qpid_tests.broker_0_10.management.*
+
+#The broker does not use the same threshold alerting system (or the QMF support needed for the tests)
+qpid_tests.broker_0_10.threshold.*
+
+#The broker does not support the policy extension
+qpid_tests.broker_0_10.extensions.ExtensionTests.test_policy_*
+
+#The broker does not support the timed-autodelete extension
+qpid_tests.broker_0_10.extensions.ExtensionTests.test_timed_autodelete
+
+#The broker does not support ring queues, fairshare, or the priority alias
+qpid_tests.broker_0_10.priority.PriorityTests.test_ring_queue*
+qpid_tests.broker_0_10.priority.PriorityTests.test_fairshare*
+qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_with_alias
+
+
+###### Behavioural differences between Java & CPP Broker ######
+
+#QPID-3587 Java broker does not alter queue counts until msgs are accepted.
+qpid_tests.broker_0_10.message.MessageTests.test_ack
+qpid_tests.broker_0_10.message.MessageTests.test_acquire
+qpid_tests.broker_0_10.message.MessageTests.test_acquire_with_no_accept_and_credit_flow
+
+#QPID-3588 Java broker sets expiration and doesnt pass TTL on to consumer
+qpid.tests.messaging.message.MessageEchoTests.testProperties
+
+#QPID-3589 Difference in exception text message causes test to fail
+qpid.tests.messaging.endpoints.AddressTests.testDeleteSpecial
+
+#QPID-3590 Java broker does not support null value for routing key
+qpid.tests.messaging.endpoints.SessionTests.testDoubleCommit
+
+
+###### Java Broker defects ######
+
+#QPID-3591 Fails due to bytes credit issue
+qpid_tests.broker_0_10.message.MessageTests.test_credit_flow_bytes
+qpid_tests.broker_0_10.message.MessageTests.test_window_flow_bytes
+
+#QPID-3592 Fails to receive more messages after restart
+qpid_tests.broker_0_10.message.MessageTests.test_window_stop
+
+#QPID-3539 Tests fail because is incorrectly being done per session and not connection
+qpid_tests.broker_0_10.message.MessageTests.test_no_local
+qpid_tests.broker_0_10.message.MessageTests.test_no_local_awkward
+qpid_tests.broker_0_10.message.MessageTests.test_no_local_exclusive_subscribe
+
+#QPID-3593 Priority Queue test failures
+qpid_tests.broker_0_10.priority.PriorityTests.test_browsing
+qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_1
+qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_2
+qpid_tests.broker_0_10.priority.PriorityTests.test_requeue
+
+#QPID-3594 exclusive queues problem
+qpid_tests.broker_0_10.queue.QueueTests.test_declare_exclusive
+
+#QPID-3477: Java broker does not handle rejection code specified in test
qpid.tests.messaging.endpoints.SessionTests.testReject
+#QPID-3595 Alternate Exchanges support requires work to be spec compliant.
+qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_queue
+qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_modify_existing_exchange_alternate
+qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_autodelete
+qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_delete
+qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_delete_loop
+qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_delete_no_match
+qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_reject_no_match
+qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_add_alternate_to_exchange
+
+#QPID-3596 Broker does not validate for reserved exchange names on create/bind.
+qpid_tests.broker_0_10.exchange.DeclareMethodExchangeFieldReservedRuleTests.*
+
+#QPID-3597 Headers exchange issues
+qpid_tests.broker_0_10.exchange.HeadersExchangeTests.*
+qpid_tests.broker_0_10.queue.QueueTests.test_unbind_headers
+qpid_tests.broker_0_10.exchange.RecommendedTypesRuleTests.testHeaders
+qpid_tests.broker_0_10.exchange.RequiredInstancesRuleTests.testAmqMatch
+qpid_tests.broker_0_10.query.QueryTests.test_exchange_bound_header
+
+#QPID-3598 Fanout exchange issues
+qpid_tests.broker_0_10.query.QueryTests.test_exchange_bound_fanout
+
+#QPID-3599 Tests fail due to differences in expected message Redelivered status
+qpid.tests.messaging.endpoints.SessionTests.testCommitAck
+qpid.tests.messaging.endpoints.SessionTests.testRelease
+qpid.tests.messaging.endpoints.SessionTests.testRollback