From e1126b978d046384380eeaa5a9b309618ddc7df5 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 1 Mar 2012 10:20:36 +0000 Subject: NO-JIRA: [AMQP 1-0 Sandbox] merging from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1295495 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/bdbstore/bin/backup.sh | 2 +- qpid/java/bdbstore/bin/storeUpgrade.sh | 2 +- qpid/java/bdbstore/build.xml | 8 +- .../berkeleydb/testclient/BackupTestClient.java | 120 --------- .../berkeleydb/testclient/BackupTestClient.java | 120 +++++++++ .../main/java/org/apache/qpid/info/AppInfo.java | 4 +- .../apache/qpid/info/systest/InfoPluginTest.java | 5 +- .../shutdown/src/main/java/shutdown.bnd | 2 +- qpid/java/broker/bin/qpid-server.bat | 4 +- qpid/java/broker/build.xml | 3 +- qpid/java/broker/etc/config.xml | 4 +- qpid/java/broker/etc/virtualhosts.xml | 15 +- qpid/java/broker/src/main/java/broker.bnd | 26 ++ .../main/java/org/apache/qpid/qmf/QMFService.java | 30 ++- .../main/java/org/apache/qpid/server/Broker.java | 6 +- .../qpid/server/configuration/BrokerConfig.java | 16 +- .../server/configuration/BrokerConfigType.java | 1 - .../server/configuration/ServerConfiguration.java | 52 +++- .../apache/qpid/server/federation/BrokerLink.java | 3 +- .../org/apache/qpid/server/message/AMQMessage.java | 99 +------- .../server/message/AbstractServerMessageImpl.java | 84 +++++++ .../qpid/server/message/MessageMetaData_0_10.java | 2 +- .../server/message/MessageTransferMessage.java | 9 +- .../server/message/TransferMessageReference.java | 4 +- .../qpid/server/protocol/AMQConnectionModel.java | 2 +- .../qpid/server/protocol/AMQProtocolEngine.java | 3 +- .../qpid/server/registry/BrokerConfigAdapter.java | 18 ++ .../qpid/server/subscription/SubscriptionImpl.java | 1 - .../qpid/server/transport/ServerConnection.java | 21 +- .../server/transport/ServerConnectionDelegate.java | 29 ++- .../qpid/server/transport/ServerSession.java | 15 +- .../server/transport/ServerSessionDelegate.java | 14 +- .../configuration/ServerConfigurationTest.java | 68 +++-- .../server/plugins/OsgiSystemPackageUtilTest.java | 4 +- .../qpid/server/store/ReferenceCountingTest.java | 10 +- qpid/java/build.deps | 2 +- qpid/java/build.overrides | 24 -- qpid/java/build.xml | 2 - qpid/java/client/src/main/java/client.bnd | 2 +- .../java/org/apache/qpid/client/AMQConnection.java | 23 ++ .../apache/qpid/client/AMQConnectionDelegate.java | 12 + .../qpid/client/AMQConnectionDelegate_0_10.java | 85 +++++-- .../qpid/client/AMQConnectionDelegate_8_0.java | 14 +- .../org/apache/qpid/client/AMQDestination.java | 18 +- .../java/org/apache/qpid/client/AMQSession.java | 70 +++--- .../org/apache/qpid/client/AMQSession_0_10.java | 179 +++++-------- .../org/apache/qpid/client/AMQSession_0_8.java | 20 +- .../apache/qpid/client/BasicMessageConsumer.java | 63 ++++- .../qpid/client/BasicMessageConsumer_0_10.java | 265 +++++++++++--------- .../qpid/client/BasicMessageConsumer_0_8.java | 27 +- .../qpid/client/BasicMessageProducer_0_10.java | 2 +- .../apache/qpid/client/messaging/address/Link.java | 8 +- .../qpid/client/protocol/AMQProtocolSession.java | 11 +- .../org/apache/qpid/filter/JMSSelectorFilter.java | 22 +- .../java/org/apache/qpid/filter/MessageFilter.java | 4 +- .../apache/qpid/client/AMQSession_0_10Test.java | 17 +- .../apache/qpid/filter/JMSSelectorFilterTest.java | 108 ++++++++ .../test/unit/message/MessageConverterTest.java | 7 +- .../qpid/test/unit/message/TestAMQSession.java | 9 +- qpid/java/common.xml | 2 +- qpid/java/common/src/main/java/common.bnd | 2 +- .../apache/qpid/common/ServerPropertyNames.java | 43 ++++ .../java/org/apache/qpid/framing/FieldTable.java | 9 +- .../java/org/apache/qpid/transport/Connection.java | 17 +- .../org/apache/qpid/transport/ServerDelegate.java | 7 + .../java/org/apache/qpid/transport/Session.java | 86 +++++-- .../main/java/org/apache/qpid/util/Strings.java | 15 ++ .../qpid/framing/PropertyFieldTableTest.java | 54 +++- qpid/java/ivy.xml | 20 +- qpid/java/lib/poms/je-4.0.117.xml | 22 ++ qpid/java/management/common/build.xml | 3 +- .../common/src/main/java/management-common.bnd | 2 +- .../management/eclipse-plugin/META-INF/MANIFEST.MF | 2 +- .../systests/etc/config-systests-firewall-2.xml | 4 +- .../systests/etc/config-systests-firewall-3.xml | 6 +- .../java/systests/etc/config-systests-settings.xml | 4 +- .../qpid/client/AMQQueueDeferredOrderingTest.java | 12 +- .../qpid/client/ResetMessageListenerTest.java | 4 +- .../AddressBasedFailoverBehaviourTest.java | 14 ++ .../client/failover/FailoverBehaviourTest.java | 276 ++++++++++++++++++++- .../client/prefetch/PrefetchBehaviourTest.java | 116 +++++++++ .../queue/MultipleTransactedBatchProducerTest.java | 3 +- .../server/queue/QueueDepthWithSelectorTest.java | 13 +- .../org/apache/qpid/test/client/CancelTest.java | 2 +- .../qpid/test/client/QueueBrowserAutoAckTest.java | 5 +- .../destination/AddressBasedDestinationTest.java | 179 +++++++++++-- .../qpid/test/unit/client/AMQConnectionTest.java | 3 +- .../test/unit/topic/DurableSubscriptionTest.java | 14 +- .../qpid/test/unit/topic/TopicSessionTest.java | 67 +++-- qpid/java/test-profiles/CPPExcludes | 3 - qpid/java/test-profiles/JavaPre010Excludes | 1 + .../python_tests/Java010PythonExcludes | 133 ++++++++-- 92 files changed, 2057 insertions(+), 886 deletions(-) delete mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java create mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java create mode 100755 qpid/java/broker/src/main/java/broker.bnd create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java delete mode 100644 qpid/java/build.overrides create mode 100644 qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java create mode 100644 qpid/java/lib/poms/je-4.0.117.xml create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java diff --git a/qpid/java/bdbstore/bin/backup.sh b/qpid/java/bdbstore/bin/backup.sh index 0e2f0fda09..22d8d52b58 100755 --- a/qpid/java/bdbstore/bin/backup.sh +++ b/qpid/java/bdbstore/bin/backup.sh @@ -31,7 +31,7 @@ WHEREAMI=`dirname $0` if [ -z "$QPID_HOME" ]; then export QPID_HOME=`cd $WHEREAMI/../ && pwd` fi -VERSION=0.13 +VERSION=0.15 LIBS=$QPID_HOME/lib/je-4.0.103.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar diff --git a/qpid/java/bdbstore/bin/storeUpgrade.sh b/qpid/java/bdbstore/bin/storeUpgrade.sh index 076b9d3f7e..f3b089b3d3 100755 --- a/qpid/java/bdbstore/bin/storeUpgrade.sh +++ b/qpid/java/bdbstore/bin/storeUpgrade.sh @@ -36,7 +36,7 @@ if [ -z "$BDB_HOME" ]; then export BDB_HOME=$(dirname $(dirname $(readlink -f $0))) fi -VERSION=0.13 +VERSION=0.15 LIBS=$BDB_HOME/lib/je-4.0.103.jar:$BDB_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar diff --git a/qpid/java/bdbstore/build.xml b/qpid/java/bdbstore/build.xml index 9355358e6c..9513e7cc5b 100644 --- a/qpid/java/bdbstore/build.xml +++ b/qpid/java/bdbstore/build.xml @@ -17,13 +17,14 @@ - under the License. --> - - + + + - + @@ -80,5 +81,4 @@ http://www.oracle.com/technetwork/database/berkeleydb/downloads/jeoslicense-0868 - diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java deleted file mode 100644 index f6344b3d7d..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.testclient; - -import org.apache.log4j.Logger; - -import org.apache.qpid.ping.PingDurableClient; -import org.apache.qpid.server.store.berkeleydb.BDBBackup; -import org.apache.qpid.util.CommandLineParser; - -import java.util.Properties; - -/** - * BackupTestClient extends {@link PingDurableClient} with an action that takes a BDB backup when a configurable - * message count is reached. This enables a test user to restore this beackup, knowing how many committed but undelivered - * messages were in the backup, in order to check that all are re-delivered when the backup is retored. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Perform BDB Backup on configurable message count. - *
- */ -public class BackupTestClient extends PingDurableClient -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(BackupTestClient.class); - - /** Holds the from directory to take backups from. */ - private String fromDir; - - /** Holds the to directory to store backups in. */ - private String toDir; - - /** - * Default constructor, passes all property overrides to the parent. - * - * @param overrides Any property overrides to apply to the defaults. - * - * @throws Exception Any underlying exception is allowed to fall through. - */ - BackupTestClient(Properties overrides) throws Exception - { - super(overrides); - } - - /** - * Starts the ping/wait/receive process. From and to directory locations for the BDB backups must be specified - * on the command line: - * - *

- *
Command Line
Option Comment - *
-fromdir The path to the directory to back the bdb log file from. - *
-todir The path to the directory to save the backed up bdb log files to. - *
- * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Use the same command line format as BDBBackup utility, (compulsory from and to directories). - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(BDBBackup.COMMAND_LINE_SPEC), - System.getProperties()); - BackupTestClient pingProducer = new BackupTestClient(options); - - // Keep the from and to directories for backups. - pingProducer.fromDir = options.getProperty("fromdir"); - pingProducer.toDir = options.getProperty("todir"); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - // pingProducer.getConnection().setExceptionListener(pingProducer); - - // Run the test procedure. - int sent = pingProducer.send(); - pingProducer.waitForUser("Press return to begin receiving the pings."); - pingProducer.receive(sent); - - System.exit(0); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - /** - * Supplies a triggered action extension, based on message count. This action takes a BDB log file backup. - */ - public void takeAction() - { - BDBBackup backupUtil = new BDBBackup(); - backupUtil.takeBackupNoLock(fromDir, toDir); - System.out.println("Took backup of BDB log files from directory: " + fromDir); - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java new file mode 100644 index 0000000000..f6344b3d7d --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.testclient; + +import org.apache.log4j.Logger; + +import org.apache.qpid.ping.PingDurableClient; +import org.apache.qpid.server.store.berkeleydb.BDBBackup; +import org.apache.qpid.util.CommandLineParser; + +import java.util.Properties; + +/** + * BackupTestClient extends {@link PingDurableClient} with an action that takes a BDB backup when a configurable + * message count is reached. This enables a test user to restore this beackup, knowing how many committed but undelivered + * messages were in the backup, in order to check that all are re-delivered when the backup is retored. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Perform BDB Backup on configurable message count. + *
+ */ +public class BackupTestClient extends PingDurableClient +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(BackupTestClient.class); + + /** Holds the from directory to take backups from. */ + private String fromDir; + + /** Holds the to directory to store backups in. */ + private String toDir; + + /** + * Default constructor, passes all property overrides to the parent. + * + * @param overrides Any property overrides to apply to the defaults. + * + * @throws Exception Any underlying exception is allowed to fall through. + */ + BackupTestClient(Properties overrides) throws Exception + { + super(overrides); + } + + /** + * Starts the ping/wait/receive process. From and to directory locations for the BDB backups must be specified + * on the command line: + * + *

+ *
Command Line
Option Comment + *
-fromdir The path to the directory to back the bdb log file from. + *
-todir The path to the directory to save the backed up bdb log files to. + *
+ * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Use the same command line format as BDBBackup utility, (compulsory from and to directories). + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(BDBBackup.COMMAND_LINE_SPEC), + System.getProperties()); + BackupTestClient pingProducer = new BackupTestClient(options); + + // Keep the from and to directories for backups. + pingProducer.fromDir = options.getProperty("fromdir"); + pingProducer.toDir = options.getProperty("todir"); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + int sent = pingProducer.send(); + pingProducer.waitForUser("Press return to begin receiving the pings."); + pingProducer.receive(sent); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Supplies a triggered action extension, based on message count. This action takes a BDB log file backup. + */ + public void takeAction() + { + BDBBackup backupUtil = new BDBBackup(); + backupUtil.takeBackupNoLock(fromDir, toDir); + System.out.println("Took backup of BDB log files from directory: " + fromDir); + } +} diff --git a/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java b/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java index bf3ef61ea1..c8e9805cd9 100644 --- a/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java +++ b/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java @@ -74,9 +74,9 @@ public class AppInfo appInfoMap.put("port", sc.getPorts().toString()); appInfoMap.put("version", QpidProperties.getReleaseVersion()); appInfoMap.put("vhosts", "standalone"); - appInfoMap.put("KeystorePath", sc.getKeystorePath()); + appInfoMap.put("KeystorePath", sc.getConnectorKeyStorePath()); appInfoMap.put("PluginDirectory", sc.getPluginDirectory()); - appInfoMap.put("CertType", sc.getCertType()); + appInfoMap.put("CertType", sc.getConnectorCertType()); appInfoMap.put("QpidWork", sc.getQpidWork()); appInfoMap.put("Bind", sc.getBind()); } diff --git a/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java b/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java index 156c9eb138..348e860d5f 100644 --- a/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java +++ b/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java @@ -210,12 +210,13 @@ public class InfoPluginTest extends QpidBrokerTestCase } br.close(); System.out.println("*** Received buffer: " + buf); - System.out.println("*** Latch countdown"); - _latch.countDown(); synchronized (_recv) { _recv.add(buf); } + + System.out.println("*** Latch countdown"); + _latch.countDown(); } catch (Exception ex) { diff --git a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd index f6ae79536d..c9e1371732 100755 --- a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd +++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.13.0 +ver: 0.15.0 Bundle-SymbolicName: qpid-shutdown-plugin Bundle-Version: ${ver} diff --git a/qpid/java/broker/bin/qpid-server.bat b/qpid/java/broker/bin/qpid-server.bat index af543decb3..6b7bbcb96e 100644 --- a/qpid/java/broker/bin/qpid-server.bat +++ b/qpid/java/broker/bin/qpid-server.bat @@ -65,7 +65,7 @@ if "%AMQJ_LOGGING_LEVEL%" == "" set AMQJ_LOGGING_LEVEL=info REM Set the default system properties that we'll use now that they have REM all been initialised -set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% "-DQPID_HOME=%QPID_HOME%" "-DQPID_WORK=%QPID_WORK%" +set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% -DQPID_HOME="%QPID_HOME%" -DQPID_WORK="%QPID_WORK%" if "%EXTERNAL_CLASSPATH%" == "" set EXTERNAL_CLASSPATH=%CLASSPATH% @@ -77,7 +77,7 @@ goto afterQpidClasspath :noQpidClasspath echo Warning: Qpid classpath not set. CLASSPATH set to %QPID_HOME%\lib\qpid-all.jar -set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar +set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar;%QPID_HOME%\lib\opt\* :afterQpidClasspath REM start parsing -run arguments diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml index b15a7fd02e..6ea2b9a63e 100644 --- a/qpid/java/broker/build.xml +++ b/qpid/java/broker/build.xml @@ -19,10 +19,10 @@ - --> - + @@ -84,4 +84,5 @@ + diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml index 2752274155..d18e1392e6 100644 --- a/qpid/java/broker/etc/config.xml +++ b/qpid/java/broker/etc/config.xml @@ -35,8 +35,8 @@ false 5671 false - /path/to/keystore.ks - keystorepass + /path/to/keystore.ks + keystorepass 5672 262144 diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml index 33a48a1349..4dcdcda6d2 100644 --- a/qpid/java/broker/etc/virtualhosts.xml +++ b/qpid/java/broker/etc/virtualhosts.xml @@ -25,8 +25,9 @@ localhost - org.apache.qpid.server.store.MemoryMessageStore - + org.apache.qpid.server.store.MemoryMessageStore + @@ -85,8 +86,9 @@ development - org.apache.qpid.server.store.MemoryMessageStore - + org.apache.qpid.server.store.MemoryMessageStore + @@ -123,8 +125,9 @@ test - org.apache.qpid.server.store.MemoryMessageStore - + org.apache.qpid.server.store.MemoryMessageStore + diff --git a/qpid/java/broker/src/main/java/broker.bnd b/qpid/java/broker/src/main/java/broker.bnd new file mode 100755 index 0000000000..25b0495a63 --- /dev/null +++ b/qpid/java/broker/src/main/java/broker.bnd @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +ver: 0.15.0 + +Bundle-SymbolicName: qpid-broker +Bundle-Version: ${ver} +Export-Package: *;version=${ver} +Bundle-RequiredExecutionEnvironment: J2SE-1.5 + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 9c77032d4c..6abef6fd6b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -694,7 +694,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory, final String srcQueue, final String destQueue, - final Long qty) + final Long qty, + final Map filter) // TODO: move based on group identifier { // TODO return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); @@ -712,6 +713,19 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); } + public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory) + { + // TODO: timestamp support + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory, + final java.lang.Boolean receive) + { + // TODO: timestamp support + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory, final String type, final String name, @@ -731,6 +745,14 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); } + public BrokerSchema.BrokerClass.QueryMethodResponseCommand query(final BrokerSchema.BrokerClass.QueryMethodResponseCommandFactory factory, + final String type, + final String name) + { + //TODO: + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + public UUID getId() { return _obj.getId(); @@ -1102,7 +1124,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable } public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory, - final Long request) + final Long request, + final Map filter) // TODO: support for purge-by-group-identifier { try { @@ -1118,7 +1141,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable public BrokerSchema.QueueClass.RerouteMethodResponseCommand reroute(final BrokerSchema.QueueClass.RerouteMethodResponseCommandFactory factory, final Long request, final Boolean useAltExchange, - final String exchange) + final String exchange, + final Map filter) // TODO: support for re-route-by-group-identifier { //TODO return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java index c337c4db75..5c1814590c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -218,9 +218,9 @@ public class Broker if (serverConfig.getEnableSSL()) { - final String keystorePath = serverConfig.getKeystorePath(); - final String keystorePassword = serverConfig.getKeystorePassword(); - final String certType = serverConfig.getCertType(); + final String keystorePath = serverConfig.getConnectorKeyStorePath(); + final String keystorePassword = serverConfig.getConnectorKeyStorePassword(); + final String certType = serverConfig.getConnectorCertType(); final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType); for(int sslPort : sslPorts) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java index 5cdb886821..7dffc2d3c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.configuration; +import java.util.List; + public interface BrokerConfig extends ConfiguredObject { @@ -44,6 +46,19 @@ public interface BrokerConfig extends ConfiguredObject getFeatures(); + void addVirtualHost(VirtualHostConfig virtualHost); void createBrokerConnection(String transport, @@ -53,5 +68,4 @@ public interface BrokerConfig extends ConfiguredObject { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 9ca916a633..0d347873c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -123,7 +123,7 @@ public class ServerConfiguration extends ConfigurationPlugin * Configuration Manager to be initialised in the Application Registry. *

* If using this ServerConfiguration via an ApplicationRegistry there is no - * need to explictly call {@link #initialise()} as this is done via the + * need to explicitly call {@link #initialise()} as this is done via the * {@link ApplicationRegistry#initialise()} method. * * @param configurationURL @@ -169,7 +169,7 @@ public class ServerConfiguration extends ConfigurationPlugin * Configuration Manager to be initialised in the Application Registry. *

* If using this ServerConfiguration via an ApplicationRegistry there is no - * need to explictly call {@link #initialise()} as this is done via the + * need to explicitly call {@link #initialise()} as this is done via the * {@link ApplicationRegistry#initialise()} method. * * @param conf @@ -239,6 +239,22 @@ public class ServerConfiguration extends ConfigurationPlugin + (_configFile == null ? "" : " Configuration file : " + _configFile); throw new ConfigurationException(message); } + + // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration + // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used. + for (String key : new String[] {"management.ssl.keystorePath", + "management.ssl.keystorePassword," + + "connector.ssl.keystorePath", + "connector.ssl.keystorePassword"}) + { + if (contains(key)) + { + final String deprecatedXpath = key.replaceAll("\\.", "/"); + final String preferredXpath = deprecatedXpath.replaceAll("keystore", "keyStore"); + _logger.warn("Validation warning: " + deprecatedXpath + " is deprecated and must be replaced by " + preferredXpath + + (_configFile == null ? "" : " Configuration file : " + _configFile)); + } + } } /* @@ -404,7 +420,7 @@ public class ServerConfiguration extends ConfigurationPlugin public final static Configuration flatConfig(File file) throws ConfigurationException { // We have to override the interpolate methods so that - // interpolation takes place accross the entirety of the + // interpolation takes place across the entirety of the // composite configuration. Without doing this each // configuration object only interpolates variables defined // inside itself. @@ -551,7 +567,8 @@ public class ServerConfiguration extends ConfigurationPlugin public String getManagementKeyStorePath() { - return getStringValue("management.ssl.keyStorePath"); + final String fallback = getStringValue("management.ssl.keystorePath"); + return getStringValue("management.ssl.keyStorePath", fallback); } public boolean getManagementSSLEnabled() @@ -561,7 +578,8 @@ public class ServerConfiguration extends ConfigurationPlugin public String getManagementKeyStorePassword() { - return getStringValue("management.ssl.keyStorePassword"); + final String fallback = getStringValue("management.ssl.keystorePassword"); + return getStringValue("management.ssl.keyStorePassword", fallback); } public boolean getQueueAutoRegister() @@ -699,17 +717,19 @@ public class ServerConfiguration extends ConfigurationPlugin return getListValue("connector.ssl.port", Collections.singletonList(DEFAULT_SSL_PORT)); } - public String getKeystorePath() + public String getConnectorKeyStorePath() { - return getStringValue("connector.ssl.keystorePath"); + final String fallback = getStringValue("connector.ssl.keystorePath"); // pre-0.13 broker supported this name. + return getStringValue("connector.ssl.keyStorePath", fallback); } - public String getKeystorePassword() + public String getConnectorKeyStorePassword() { - return getStringValue("connector.ssl.keystorePassword"); + final String fallback = getStringValue("connector.ssl.keystorePassword"); // pre-0.13 brokers supported this name. + return getStringValue("connector.ssl.keyStorePassword", fallback); } - public String getCertType() + public String getConnectorCertType() { return getStringValue("connector.ssl.certType", "SunX509"); } @@ -773,4 +793,16 @@ public class ServerConfiguration extends ConfigurationPlugin { return getIntValue("maximumChannelCount", 256); } + + /** + * List of Broker features that have been disabled within configuration. Disabled + * features won't be advertised to the clients on connection. + * + * @return list of disabled features, or empty list if no features are disabled. + */ + public List getDisabledFeatures() + { + final List disabledFeatures = getListValue("disabledFeatures", Collections.emptyList()); + return disabledFeatures; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java index f21158cd0c..f330e2f708 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.federation; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; @@ -252,7 +253,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener _qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism); final Map serverProps = _qpidConnection.getServerProperties(); - _remoteFederationTag = (String) serverProps.get("qpid.federation_tag"); + _remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG); if(_remoteFederationTag == null) { _remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index f16e30ef92..0e0b18aa2f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -30,20 +30,17 @@ import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.queue.AMQQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; /** * A deliverable message. */ -public class AMQMessage implements ServerMessage +public class AMQMessage extends AbstractServerMessageImpl { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); - private final AtomicInteger _referenceCount = new AtomicInteger(0); - /** Flag to indicate that this message requires 'immediate' delivery. */ private static final byte IMMEDIATE = 0x01; @@ -76,6 +73,8 @@ public class AMQMessage implements ServerMessage public AMQMessage(StoredMessage handle, WeakReference channelRef) { + super(handle); + _handle = handle; final MessageMetaData metaData = handle.getMetaData(); _size = metaData.getContentSize(); @@ -89,12 +88,6 @@ public class AMQMessage implements ServerMessage _channelRef = channelRef; } - - public String debugIdentity() - { - return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")"; - } - public void setExpiration(final long expiration) { @@ -102,11 +95,6 @@ public class AMQMessage implements ServerMessage } - public boolean isReferenced() - { - return _referenceCount.get() > 0; - } - public MessageMetaData getMessageMetaData() { return _handle.getMetaData(); @@ -117,87 +105,11 @@ public class AMQMessage implements ServerMessage return getMessageMetaData().getContentHeaderBody(); } - - public Long getMessageId() { return _handle.getMessageNumber(); } - /** - * Creates a long-lived reference to this message, and increments the count of such references, as an atomic - * operation. - */ - public AMQMessage takeReference() - { - incrementReference(); // _referenceCount.incrementAndGet(); - - return this; - } - - public boolean incrementReference() - { - return incrementReference(1); - } - - /* Threadsafe. Increment the reference count on the message. */ - public boolean incrementReference(int count) - { - - if(_referenceCount.addAndGet(count) <= 0) - { - _referenceCount.addAndGet(-count); - return false; - } - else - { - return true; - } - - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - * - * - * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that - * failed - */ - public void decrementReference() - { - int count = _referenceCount.decrementAndGet(); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE/2); - - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - if (_handle != null) - { - _handle.remove(); - - } - } - else - { - if (count < 0) - { - throw new RuntimeException("Reference count for message id " + debugIdentity() - + " has gone below 0."); - } - } - } - - /** * Called selectors to determin if the message has already been sent * @@ -323,10 +235,7 @@ public class AMQMessage implements ServerMessage public String toString() { - // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + - // _taken + " by :" + _takenBySubcription; - - return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount; + return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount(); } public int getContent(ByteBuffer buf, int offset) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java new file mode 100644 index 0000000000..186bb8601c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -0,0 +1,84 @@ +package org.apache.qpid.server.message; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.server.store.StoredMessage; + +public abstract class AbstractServerMessageImpl implements ServerMessage +{ + private final AtomicInteger _referenceCount = new AtomicInteger(0); + private final StoredMessage _handle; + + public AbstractServerMessageImpl(StoredMessage handle) + { + _handle = handle; + } + + public boolean incrementReference() + { + return incrementReference(1); + } + + public boolean incrementReference(int count) + { + if(_referenceCount.addAndGet(count) <= 0) + { + _referenceCount.addAndGet(-count); + return false; + } + else + { + return true; + } + } + + /** + * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the + * message store. + * + * + * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that + * failed + */ + public void decrementReference() + { + int count = _referenceCount.decrementAndGet(); + + // note that the operation of decrementing the reference count and then removing the message does not + // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after + // the message has been passed to all queues. i.e. we are + // not relying on the all the increments having taken place before the delivery manager decrements. + if (count == 0) + { + // set the reference count way below 0 so that we can detect that the message has been deleted + // this is to guard against the message being spontaneously recreated (from the mgmt console) + // by copying from other queues at the same time as it is being removed. + _referenceCount.set(Integer.MIN_VALUE/2); + + // must check if the handle is null since there may be cases where we decide to throw away a message + // and the handle has not yet been constructed + if (_handle != null) + { + _handle.remove(); + } + } + else + { + if (count < 0) + { + throw new RuntimeException("Reference count for message id " + debugIdentity() + + " has gone below 0."); + } + } + } + + public String debugIdentity() + { + return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")"; + } + + protected int getReferenceCount() + { + return _referenceCount.get(); + } +} \ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java index 2297e4200d..f9863f4945 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java @@ -44,7 +44,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes private int _bodySize; private volatile SoftReference _body; - private static final int ENCODER_SIZE = 1 << 16; + private static final int ENCODER_SIZE = 1 << 10; public static final MessageMetaDataType.Factory FACTORY = new MetaDataFactory(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java index af78042a63..1a230a2590 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java @@ -29,18 +29,14 @@ import java.nio.ByteBuffer; import java.lang.ref.WeakReference; -public class MessageTransferMessage implements InboundMessage, ServerMessage +public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage { - - private StoredMessage _storeMessage; - - private WeakReference _sessionRef; public MessageTransferMessage(StoredMessage storeMessage, WeakReference sessionRef) { - + super(storeMessage); _storeMessage = storeMessage; _sessionRef = sessionRef; } @@ -145,5 +141,4 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage getFeatures() + { + final List features = new ArrayList(); + if (!_instance.getConfiguration().getDisabledFeatures().contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR)) + { + features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); + } + + return Collections.unmodifiableList(features); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 92f21b8b1c..adb0a84151 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -460,7 +460,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public void queueDeleted(AMQQueue queue) { _deleted.set(true); -// _channel.queueDeleted(queue); } public boolean filtersMessages() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index d83013afba..e18b453db3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -259,10 +259,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void close(AMQConstant cause, String message) throws AMQException { + closeSubscriptions(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { - replyCode = ConnectionCloseCode.get(cause.getCode()); + replyCode = ConnectionCloseCode.get(cause.getCode()); } catch (IllegalArgumentException iae) { @@ -389,7 +390,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } @Override - public boolean isSessionNameUnique(String name) + public boolean isSessionNameUnique(byte[] name) { return !super.hasSessionWithName(name); } @@ -399,4 +400,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { return _authorizedPrincipal.getName(); } + + @Override + public void closed() + { + closeSubscriptions(); + super.closed(); + } + + private void closeSubscriptions() + { + for (Session ssn : getChannels()) + { + ((ServerSession)ssn).unregisterSubscriptions(); + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 2de8a0425e..8d6e0e0d80 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -32,7 +32,9 @@ import java.util.StringTokenizer; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.configuration.BrokerConfig; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -58,15 +60,14 @@ import org.apache.qpid.transport.SessionDetached; public class ServerConnectionDelegate extends ServerDelegate { - private String _localFQDN; + private final String _localFQDN; private final IApplicationRegistry _appRegistry; public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN) { - this(new HashMap(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); + this(createConnectionProperties(appRegistry.getBroker()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); } - public ServerConnectionDelegate(Map properties, List locales, IApplicationRegistry appRegistry, @@ -78,6 +79,18 @@ public class ServerConnectionDelegate extends ServerDelegate _localFQDN = localFQDN; } + private static Map createConnectionProperties(final BrokerConfig brokerConfig) + { + final Map map = new HashMap(2); + map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag()); + final List features = brokerConfig.getFeatures(); + if (features != null && features.size() > 0) + { + map.put(ServerPropertyNames.QPID_FEATURES, features); + } + return map; + } + private static List parseToList(String mechanisms) { List list = new ArrayList(); @@ -234,22 +247,22 @@ public class ServerConnectionDelegate extends ServerDelegate @Override public void sessionAttach(final Connection conn, final SessionAttach atc) { - final String clientId = new String(atc.getName()); - final Session ssn = getSession(conn, atc); + final Session ssn; - if(isSessionNameUnique(clientId,conn)) + if(isSessionNameUnique(atc.getName(), conn)) { + ssn = sessionAttachImpl(conn, atc); conn.registerSession(ssn); - super.sessionAttach(conn, atc); } else { + ssn = getSession(conn, atc); ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY)); ssn.closed(); } } - private boolean isSessionNameUnique(final String name, final Connection conn) + private boolean isSessionNameUnique(final byte[] name, final Connection conn) { final ServerConnection sconn = (ServerConnection) conn; final String userId = sconn.getUserName(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 28dec2ad28..429cc4cf66 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -53,6 +53,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -172,6 +173,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void postCommit() { + MessageReference ref = message.newReference(); for(int i = 0; i < _queues.length; i++) { try @@ -184,6 +186,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi throw new RuntimeException(e); } } + ref.release(); } public void onRollback() @@ -412,12 +415,11 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { queue.unregisterSubscription(sub); } - } catch (AMQException e) { // TODO - _logger.error("Failed to unregister subscription", e); + _logger.error("Failed to unregister subscription :" + e.getMessage(), e); } finally { @@ -683,12 +685,17 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { // unregister subscriptions in order to prevent sending of new messages // to subscriptions with closing session + unregisterSubscriptions(); + + super.close(); + } + + void unregisterSubscriptions() + { final Collection subscriptions = getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } - - super.close(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 17bd06538f..c87919b478 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1261,11 +1261,10 @@ public class ServerSessionDelegate extends SessionDelegate { setThreadSubject(session); - for(Subscription_0_10 sub : getSubscriptions(session)) - { - ((ServerSession)session).unregister(sub); - } - ((ServerSession)session).onClose(); + ServerSession serverSession = (ServerSession)session; + + serverSession.unregisterSubscriptions(); + serverSession.onClose(); } @Override @@ -1274,11 +1273,6 @@ public class ServerSessionDelegate extends SessionDelegate closed(session); } - public Collection getSubscriptions(Session session) - { - return ((ServerSession)session).getSubscriptions(); - } - private void setThreadSubject(Session session) { final ServerConnection scon = (ServerConnection) session.getConnection(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index d368a2d1ee..9afd2a45a9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -256,7 +256,7 @@ public class ServerConfigurationTest extends QpidTestCase assertEquals(false, _serverConfig.getManagementSSLEnabled()); } - public void testGetManagementKeyStorePassword() throws ConfigurationException + public void testGetManagementKeystorePassword() throws ConfigurationException { // Check default _serverConfig.initialise(); @@ -534,43 +534,57 @@ public class ServerConfigurationTest extends QpidTestCase assertEquals("10", _serverConfig.getSSLPorts().get(0)); } - public void testGetKeystorePath() throws ConfigurationException + public void testGetConnectorKeystorePath() throws ConfigurationException { // Check default _serverConfig.initialise(); - assertNull(_serverConfig.getKeystorePath()); + assertNull(_serverConfig.getConnectorKeyStorePath()); // Check value we set - _config.setProperty("connector.ssl.keystorePath", "a"); + _config.setProperty("connector.ssl.keyStorePath", "a"); _serverConfig = new ServerConfiguration(_config); _serverConfig.initialise(); - assertEquals("a", _serverConfig.getKeystorePath()); + assertEquals("a", _serverConfig.getConnectorKeyStorePath()); + + // Ensure we continue to support the old name keystorePath + _config.clearProperty("connector.ssl.keyStorePath"); + _config.setProperty("connector.ssl.keystorePath", "b"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("b", _serverConfig.getConnectorKeyStorePath()); } - public void testGetKeystorePassword() throws ConfigurationException + public void testGetConnectorKeystorePassword() throws ConfigurationException { // Check default _serverConfig.initialise(); - assertNull(_serverConfig.getKeystorePassword()); + assertNull(_serverConfig.getConnectorKeyStorePassword()); // Check value we set - _config.setProperty("connector.ssl.keystorePassword", "a"); + _config.setProperty("connector.ssl.keyStorePassword", "a"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("a", _serverConfig.getConnectorKeyStorePassword()); + + // Ensure we continue to support the old name keystorePassword + _config.clearProperty("connector.ssl.keyStorePassword"); + _config.setProperty("connector.ssl.keystorePassword", "b"); _serverConfig = new ServerConfiguration(_config); _serverConfig.initialise(); - assertEquals("a", _serverConfig.getKeystorePassword()); + assertEquals("b", _serverConfig.getConnectorKeyStorePassword()); } - public void testGetCertType() throws ConfigurationException + public void testGetConnectorCertType() throws ConfigurationException { // Check default _serverConfig.initialise(); - assertEquals("SunX509", _serverConfig.getCertType()); + assertEquals("SunX509", _serverConfig.getConnectorCertType()); // Check value we set _config.setProperty("connector.ssl.certType", "a"); _serverConfig = new ServerConfiguration(_config); _serverConfig.initialise(); - assertEquals("a", _serverConfig.getCertType()); + assertEquals("a", _serverConfig.getConnectorCertType()); } public void testGetUseBiasedWrites() throws ConfigurationException @@ -1284,7 +1298,7 @@ public class ServerConfigurationTest extends QpidTestCase } /** - * Test that a non-existant virtualhost file throws a {@link ConfigurationException}. + * Test that a non-existent virtualhost file throws a {@link ConfigurationException}. *

* Test for QPID-2624 */ @@ -1312,7 +1326,27 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** + * Tests that element disabledFeatures allows features that would + * otherwise be advertised by the broker to be turned off. + */ + public void testDisabledFeatures() throws ConfigurationException + { + // Check default + _serverConfig.initialise(); + _serverConfig = new ServerConfiguration(_config); + assertEquals("Unexpected size", 0, _serverConfig.getDisabledFeatures().size()); + + // Check value we set + _config.addProperty("disabledFeatures", "qpid.feature1"); + _config.addProperty("disabledFeatures", "qpid.feature2"); + _serverConfig = new ServerConfiguration(_config); + + assertEquals("Unexpected size",2, _serverConfig.getDisabledFeatures().size()); + assertTrue("Unexpected contents", _serverConfig.getDisabledFeatures().contains("qpid.feature1")); + } + + /** * Tests that the old element security.jmx.access (that used to be used * to define JMX access rights) is rejected. */ @@ -1338,7 +1372,7 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** * Tests that the old element security.jmx.principal-database (that used to define the * principal database used for JMX authentication) is rejected. */ @@ -1364,7 +1398,7 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** * Tests that the old element security.principal-databases. ... (that used to define * principal databases) is rejected. */ @@ -1389,7 +1423,7 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** * Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was * replaced by housekeeping.checkPeriod) is rejected. */ diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java index 4a03445357..f2249c5931 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java @@ -72,11 +72,11 @@ public class OsgiSystemPackageUtilTest extends QpidTestCase _map.put("org.apache.qpid.xyz", "1.0.0"); _map.put("org.abc", "1.2.3"); - _util = new OsgiSystemPackageUtil(new Version("0.13"), _map); + _util = new OsgiSystemPackageUtil(new Version("0.15"), _map); final String systemPackageString = _util.getFormattedSystemPackageString(); - assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=0.13.0", systemPackageString); + assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=0.15.0", systemPackageString); } public void testWithQpidPackageWithoutQpidReleaseNumberSet() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java index 2d41eb9899..2ffa157ca8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java @@ -86,11 +86,7 @@ public class ReferenceCountingTest extends QpidTestCase AMQMessage message = new AMQMessage(storedMessage); - message = message.takeReference(); - - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - + message.incrementReference(); assertEquals(1, _store.getMessageCount()); message.decrementReference(); @@ -146,12 +142,12 @@ public class ReferenceCountingTest extends QpidTestCase AMQMessage message = new AMQMessage(storedMessage); - message = message.takeReference(); + message.incrementReference(); // we call routing complete to set up the handle // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertEquals(1, _store.getMessageCount()); - message = message.takeReference(); + message.incrementReference(); message.decrementReference(); assertEquals(1, _store.getMessageCount()); } diff --git a/qpid/java/build.deps b/qpid/java/build.deps index 15844d7a42..17f963ece7 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -139,6 +139,6 @@ management-eclipse-plugin.test.libs=${test.libs} management-common.test.libs=${test.libs} # optional bdbstore module deps -bdb-je=lib/bdbstore/je-4.0.103.jar +bdb-je=lib/bdbstore/je-4.0.117.jar bdbstore.libs=${bdb-je} bdbstore.test.libs=${test.libs} diff --git a/qpid/java/build.overrides b/qpid/java/build.overrides deleted file mode 100644 index dbe05b4ec0..0000000000 --- a/qpid/java/build.overrides +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# File to allow overriding default values of properties within the build system -# without having to specify them on the command line on every build execution - -# Override the setting for the optional modules to be built -#modules.opt=bdbstore diff --git a/qpid/java/build.xml b/qpid/java/build.xml index 4400626370..e6c154d3d0 100644 --- a/qpid/java/build.xml +++ b/qpid/java/build.xml @@ -22,8 +22,6 @@ - - diff --git a/qpid/java/client/src/main/java/client.bnd b/qpid/java/client/src/main/java/client.bnd index 98696dc7d8..bd13844e8e 100755 --- a/qpid/java/client/src/main/java/client.bnd +++ b/qpid/java/client/src/main/java/client.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.13.0 +ver: 0.15.0 Bundle-SymbolicName: qpid-client Bundle-Version: ${ver} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index f15af72407..ad7885f195 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -175,6 +175,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // new amqp-0-10 encoded format. private boolean _useLegacyMapMessageFormat; + //used to track the last failover time for + //Address resolution purposes + private volatile long _lastFailoverTime = 0; + /** * @param broker brokerdetails * @param username username @@ -1076,6 +1080,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public boolean firePreFailover(boolean redirect) { + _lastFailoverTime = System.currentTimeMillis(); boolean proceed = true; if (_connectionListener != null) { @@ -1397,6 +1402,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return null; } } + + /** + * Tests whether the Broker has advertised support for the named feature. + * + * @param featureName + * + * @return true if the feature is supported, or false otherwise. + */ + boolean isSupportedServerFeature(final String featureName) + { + return _delegate.isSupportedServerFeature(featureName); + } + public boolean isFailingOver() { return (_protocolHandler.getFailoverLatch() != null); @@ -1462,4 +1480,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } } + + public long getLastFailoverTime() + { + return _lastFailoverTime; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 8768f93c8c..afb0e45f7a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -65,4 +65,16 @@ public interface AMQConnectionDelegate ProtocolVersion getProtocolVersion(); boolean verifyClientID() throws JMSException, AMQException; + + /** + * Tests whether the server has advertised support for the specified feature + * via the qpid.features server connection property. By convention the feature name + * with begin qpid. followed by one or more words separated by minus signs + * e.g. qpid.jms-selector. + * + * @param featureName name of feature. + * + * @return true if the feature is supported by the server + */ + boolean isSupportedServerFeature(final String featureName); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 0ed3db6ecb..5e4f84ce9f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,6 +19,7 @@ package org.apache.qpid.client; * */ +package org.apache.qpid.client; import java.io.IOException; import java.util.ArrayList; @@ -36,6 +36,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.transport.ClientConnectionDelegate; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; @@ -62,17 +63,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec */ private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class); - /** - * The name of the UUID property - */ - private static final String UUID_NAME = "qpid.federation_tag"; /** * The AMQ Connection. */ - private AMQConnection _conn; + private final AMQConnection _conn; /** - * The QpidConeection instance that is mapped with thie JMS connection. + * The QpidConeection instance that is mapped with this JMS connection. */ org.apache.qpid.transport.Connection _qpidConnection; private ConnectionException exception = null; @@ -281,24 +278,29 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); - try + _qpidConnection.notifyFailoverRequired(); + + synchronized (_conn.getFailoverMutex()) { - if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + try { - _conn.failoverPrep(); - _conn.resubscribeSessions(); - _conn.fireFailoverComplete(); - return; + if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + { + _conn.failoverPrep(); + _conn.resubscribeSessions(); + _conn.fireFailoverComplete(); + return; + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); } - } - catch (Exception e) - { - _logger.error("error during failover", e); - } - finally - { - _conn.getProtocolHandler().getFailoverLatch().countDown(); - _conn.getProtocolHandler().setFailoverLatch(null); } } @@ -324,6 +326,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public T executeRetrySupport(FailoverProtectedOperation operation) throws E { + if (_conn.isFailingOver()) + { + try + { + _conn.blockUntilNotFailingOver(); + } + catch (InterruptedException e) + { + //ignore + } + } + try { return operation.execute(); @@ -352,7 +366,32 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public String getUUID() { - return (String)_qpidConnection.getServerProperties().get(UUID_NAME); + return (String)_qpidConnection.getServerProperties().get(ServerPropertyNames.FEDERATION_TAG); + } + + /* + * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String) + */ + public boolean isSupportedServerFeature(final String featureName) + { + if (featureName == null) + { + throw new IllegalArgumentException("featureName cannot be null"); + } + final Map serverProperties = _qpidConnection.getServerProperties(); + boolean featureSupported = false; + if (serverProperties != null && serverProperties.containsKey(ServerPropertyNames.QPID_FEATURES)) + { + final Object supportServerFeatures = serverProperties.get(ServerPropertyNames.QPID_FEATURES); + featureSupported = supportServerFeatures instanceof List && ((List)supportServerFeatures).contains(featureName); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Server support for feature '" + featureName + "' : " + featureSupported); + } + + return featureSupported; } private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index b1a22155d6..bff4df0e93 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; import java.security.GeneralSecurityException; -import java.security.Security; import java.text.MessageFormat; import java.util.ArrayList; import java.util.EnumSet; @@ -44,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory; public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); - private AMQConnection _conn; + private final AMQConnection _conn; public void closeConnection(long timeout) throws JMSException, AMQException @@ -379,4 +379,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return true; } + + /* + * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String) + */ + public boolean isSupportedServerFeature(String featureName) + { + // The Qpid Java Broker 0-8..0-9-1 does not advertise features by the qpid.features property, so for now + // we just hardcode JMS selectors as supported. + return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index acd46da11a..f9a38138ba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import java.net.URISyntaxException; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.Destination; import javax.naming.NamingException; @@ -59,7 +60,7 @@ public abstract class AMQDestination implements Destination, Referenceable private boolean _browseOnly; - private boolean _isAddressResolved; + private AtomicLong _addressResolved = new AtomicLong(0); private AMQShortString _queueName; @@ -77,7 +78,7 @@ public abstract class AMQDestination implements Destination, Referenceable public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; - + // ----- Fields required to support new address syntax ------- public enum DestSyntax { @@ -740,12 +741,12 @@ public abstract class AMQDestination implements Destination, Referenceable public boolean isAddressResolved() { - return _isAddressResolved; + return _addressResolved.get() > 0; } - public void setAddressResolved(boolean addressResolved) + public void setAddressResolved(long addressResolved) { - _isAddressResolved = addressResolved; + _addressResolved.set(addressResolved); } private static Address createAddressFromString(String str) @@ -823,7 +824,7 @@ public abstract class AMQDestination implements Destination, Referenceable dest.setTargetNode(_targetNode); dest.setSourceNode(_sourceNode); dest.setLink(_link); - dest.setAddressResolved(_isAddressResolved); + dest.setAddressResolved(_addressResolved.get()); return dest; } @@ -836,4 +837,9 @@ public abstract class AMQDestination implements Destination, Referenceable { _isDurable = b; } + + public boolean isResolvedAfter(long time) + { + return _addressResolved.get() > time; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 44c4e8987a..ef44221ec1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -89,9 +89,9 @@ import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; @@ -308,7 +308,7 @@ public abstract class AMQSession arguments = FieldTable.convertToMap(consumer.getArguments()); - - if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) - { - isTopic = consumer.getDestination() instanceof AMQTopic || - consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ; - - preAcquire = isTopic || (!consumer.isNoConsume() && - (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals(""))); - } - else - { - isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE; - - preAcquire = !consumer.isNoConsume() && - (isTopic || consumer.getMessageSelector() == null || - consumer.getMessageSelector().equals("")); - - arguments.putAll( - (Map) consumer.getDestination().getLink().getSubscription().getArgs()); - } - - boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; - - if (consumer.getDestination().getLink() != null) - { - acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE; - } - - getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), - acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, - preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); - } - catch (JMSException e) + boolean preAcquire = consumer.isPreAcquire(); + + AMQDestination destination = consumer.getDestination(); + long capacity = consumer.getCapacity(); + + Map arguments = FieldTable.convertToMap(consumer.getArguments()); + + Link link = destination.getLink(); + if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) { - throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); + arguments.putAll((Map) link.getSubscription().getArgs()); } + boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; + + getQpidSession().messageSubscribe + (queueName.toString(), String.valueOf(tag), + acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); if (capacity == 0) @@ -646,21 +633,6 @@ public class AMQSession_0_10 extends AMQSession 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (prefetch()) - { - capacity = getAMQConnection().getMaxPrefetch(); - } - return capacity; - } - /** * Create an 0_10 message producer */ @@ -825,7 +797,7 @@ public class AMQSession_0_10 extends AMQSession 0 && (_connection.getMaxPrefetch() == 1 || + _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)) { // send completed so consumer credits don't dry up messageAcknowledge(_txRangeSet, false); @@ -1168,8 +1146,8 @@ public class AMQSession_0_10 extends AMQSession map) - { - StringBuilder sb = new StringBuilder(); - sb.append("<"); - if (map != null) - { - for(String key : map.keySet()) - { - sb.append(key).append(" = ").append(map.get(key)).append(" "); - } - } - sb.append(">"); - return sb.toString(); - } - protected void acknowledgeImpl() { RangeSet range = gatherUnackedRangeSet(); @@ -1378,4 +1324,15 @@ public class AMQSession_0_10 extends AMQSession + // messages sent by the brokers following the first rollback + // after failover + _highestDeliveryTag.set(-1); + super.resubscribe(); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 369c8a6e9d..e33410f5fe 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -41,6 +41,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; @@ -333,24 +334,9 @@ public final class AMQSession_0_8 extends AMQSession extends Closeable implements Messa /** The connection being used by this consumer */ protected final AMQConnection _connection; - protected final String _messageSelector; + protected final MessageFilter _messageSelectorFilter; private final boolean _noLocal; @@ -138,7 +143,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa */ private final boolean _autoClose; - private final boolean _noConsume; + private final boolean _browseOnly; private List _closedStack = null; @@ -146,28 +151,44 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, - FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + FieldTable rawSelector, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { _channelId = channelId; _connection = connection; - _messageSelector = messageSelector; _noLocal = noLocal; _destination = destination; _messageFactory = messageFactory; _session = session; _protocolHandler = protocolHandler; - _arguments = arguments; _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; - _noConsume = noConsume; + _browseOnly = browseOnly; + + try + { + if (messageSelector == null || "".equals(messageSelector.trim())) + { + _messageSelectorFilter = null; + } + else + { + _messageSelectorFilter = new JMSSelectorFilter(messageSelector); + } + } + catch (final AMQInternalException ie) + { + InvalidSelectorException ise = new InvalidSelectorException("cannot create consumer because of selector issue"); + ise.setLinkedException(ie); + throw ise; + } // Force queue browsers not to use acknowledge modes. - if (_noConsume) + if (_browseOnly) { _acknowledgeMode = Session.NO_ACKNOWLEDGE; } @@ -175,6 +196,21 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { _acknowledgeMode = acknowledgeMode; } + + final FieldTable ft = FieldTableFactory.newFieldTable(); + // rawSelector is used by HeadersExchange and is not a JMS Selector + if (rawSelector != null) + { + ft.addAll(rawSelector); + } + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise + // possible to determine when querying the broker whether there are no arguments or just a non-matching selector + // argument, as specifying null for the arguments when querying means they should not be checked at all + ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); + + _arguments = ft; } public AMQDestination getDestination() @@ -186,7 +222,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { checkPreConditions(); - return _messageSelector; + return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector(); } public MessageListener getMessageListener() throws JMSException @@ -345,6 +381,11 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa return _receiving.get(); } + public MessageFilter getMessageSelectorFilter() + { + return _messageSelectorFilter; + } + public Message receive() throws JMSException { return receive(0); @@ -874,9 +915,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa return _autoClose; } - public boolean isNoConsume() + public boolean isBrowseOnly() { - return _noConsume; + return _browseOnly; } public void rollback() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 3c24c67f9b..47c20b683c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -20,22 +20,20 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; -import org.apache.qpid.filter.MessageFilter; -import org.apache.qpid.filter.JMSSelectorFilter; +import org.apache.qpid.jms.Session; -import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; + import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -50,11 +48,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer session, AMQProtocolHandler protocolHandler, + FieldTable rawSelector, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); + rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; - if (messageSelector != null && !messageSelector.equals("")) - { - try - { - _filter = new JMSSelectorFilter(messageSelector); - } - catch (AMQInternalException e) - { - throw new InvalidSelectorException("cannot create consumer because of selector issue"); - } - if (destination instanceof AMQQueue) - { - _preAcquire = false; - } - } - - // Destination setting overrides connection defaults - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (getSession().prefetch()) - { - capacity = _0_10session.getAMQConnection().getMaxPrefetch(); - } + + _preAcquire = evaluatePreAcquire(browseOnly, destination); + + _capacity = evaluateCapacity(destination); + _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); + if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { @@ -122,7 +98,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer 0) - { - result = true; - } + final RangeSet acquired = acq.getTransfers(); + if (acquired != null && acquired.size() > 0) + { + result = true; } return result; } + private void messageFlow() + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); try { - if (messageListener != null && capacity == 0) + if (messageListener != null && _capacity == 0) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); + messageFlow(); } if (messageListener != null && !_synchronousQueue.isEmpty()) { @@ -389,9 +365,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer 0) + if (_capacity > 0) { _0_10session.getQpidSession().messageFlow (getConsumerTagString(), MessageCreditUnit.MESSAGE, - capacity, + _capacity, Option.UNRELIABLE); } _0_10session.syncDispatchQueue(); o = super.getMessageFromQueue(-1); } - if (capacity == 0) + if (_capacity == 0) { _syncReceive.set(false); } @@ -448,16 +420,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer 0) + { + capacity = destination.getLink().getConsumerCapacity(); + } + else if (getSession().prefetch()) + { + capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + } + return capacity; + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 00acd5e866..cf1d7cedeb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.client; -import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.filter.JMSSelectorFilter; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,24 +38,23 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer 0) - { - JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector); - } + consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); } - catch (AMQInternalException e) + + if (isBrowseOnly()) { - throw new InvalidSelectorException("cannot create consumer because of selector issue"); + consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } + } void sendCancel() throws AMQException, FailoverException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 57f64c2f92..16afa51c74 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } catch (Exception e) { - JMSException jmse = new JMSException("Exception when sending message"); + JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage()); jmse.setLinkedException(e); jmse.initCause(e); throw jmse; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 5f97d625b4..c73d800b14 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,18 +20,14 @@ */ package org.apache.qpid.client.messaging.address; -import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED; - import java.util.HashMap; import java.util.Map; -import org.apache.qpid.client.messaging.address.Node.QueueNode; - public class Link { public enum FilterType { SQL92, XQUERY, SUBJECT } - public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED } + public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE } protected String name; protected String _filter; @@ -42,7 +38,7 @@ public class Link protected int _producerCapacity = 0; protected Node node; protected Subscription subscription; - protected Reliability reliability = UNSPECIFIED; + protected Reliability reliability = Reliability.AT_LEAST_ONCE; public Reliability getReliability() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index a938bd47f8..b7253e6e9c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -47,6 +47,7 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -362,7 +363,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void closeProtocolSession() throws AMQException { - _protocolHandler.closeConnection(0); + try + { + _protocolHandler.getNetworkConnection().close(); + } + catch(TransportException e) + { + //ignore such exceptions, they were already logged + //and this is a forcible close. + } } public void failover(String host, int port) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java index 4159986090..a1b4aff659 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java @@ -26,20 +26,21 @@ import org.slf4j.LoggerFactory; public class JMSSelectorFilter implements MessageFilter { - /** - * this JMSSelectorFilter's logger - */ private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class); - private String _selector; - private BooleanExpression _matcher; + private final String _selector; + private final BooleanExpression _matcher; public JMSSelectorFilter(String selector) throws AMQInternalException { + if (selector == null || "".equals(selector)) + { + throw new IllegalArgumentException("Cannot create a JMSSelectorFilter with a null or empty selector string"); + } _selector = selector; - if (JMSSelectorFilter._logger.isDebugEnabled()) + if (_logger.isDebugEnabled()) { - JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector); + _logger.debug("Created JMSSelectorFilter with selector:" + _selector); } _matcher = new SelectorParser().parse(selector); } @@ -49,16 +50,15 @@ public class JMSSelectorFilter implements MessageFilter try { boolean match = _matcher.matches(message); - if (JMSSelectorFilter._logger.isDebugEnabled()) + if (_logger.isDebugEnabled()) { - JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System - .identityHashCode(_selector) + "):" + _selector); + _logger.debug(message + " match(" + match + ") selector(" + _selector + "): " + _selector); } return match; } catch (AMQInternalException e) { - JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message " + message, e); + _logger.warn("Caught exception when evaluating message selector for message " + message, e); } return false; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java index 62e4a28c1e..ec0e8ea4c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java @@ -17,11 +17,11 @@ */ package org.apache.qpid.filter; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.message.AbstractJMSMessage; public interface MessageFilter { - boolean matches(AbstractJMSMessage message) throws AMQInternalException; + boolean matches(AbstractJMSMessage message); + String getSelector(); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 849827216c..68531eee84 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -29,7 +29,6 @@ import javax.jms.MessageProducer; import junit.framework.TestCase; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.Connection.SessionFactory; @@ -334,7 +333,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1); } catch (Exception e) @@ -383,7 +382,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); session.start(); consumer.receive(1); fail("JMSException should be thrown"); @@ -401,7 +400,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); session.start(); consumer.receive(1); } @@ -419,7 +418,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); session.start(); consumer.receiveNoWait(); fail("JMSException should be thrown"); @@ -437,7 +436,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); consumer.setMessageListener(new MockMessageListener()); fail("JMSException should be thrown"); } @@ -454,7 +453,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); consumer.setMessageListener(new MockMessageListener()); } catch (Exception e) @@ -471,7 +470,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); consumer.close(); } catch (Exception e) @@ -488,7 +487,7 @@ public class AMQSession_0_10Test extends TestCase try { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, - null, new FieldTable(), false, true); + null, null, false, true); consumer.close(); fail("JMSException should be thrown"); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java new file mode 100644 index 0000000000..d4d8ea4350 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.filter; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.TestMessageHelper; + +public class JMSSelectorFilterTest extends TestCase +{ + + public void testEmptySelectorFilter() throws Exception + { + try + { + new JMSSelectorFilter(""); + fail("Should not be able to create a JMSSelectorFilter with an empty selector"); + } + catch (IllegalArgumentException iae) + { + // pass + } + } + + public void testNullSelectorFilter() throws Exception + { + try + { + new JMSSelectorFilter(null); + fail("Should not be able to create a JMSSelectorFilter with a null selector"); + } + catch (IllegalArgumentException iae) + { + // pass + } + } + + public void testInvalidSelectorFilter() throws Exception + { + try + { + new JMSSelectorFilter("$%^"); + fail("Unparsable selector so expected AMQInternalException to be thrown"); + } + catch (AMQInternalException amqie) + { + // pass + } + } + + public void testSimpleSelectorFilter() throws Exception + { + MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select=5"); + + assertNotNull("Filter object is null", simpleSelectorFilter); + assertNotNull("Selector string is null", simpleSelectorFilter.getSelector()); + assertEquals("Unexpected selector", "select=5", simpleSelectorFilter.getSelector()); + assertTrue("Filter object is invalid", simpleSelectorFilter != null); + + final JMSTextMessage message = TestMessageHelper.newJMSTextMessage(); + + message.setIntProperty("select", 4); + assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message)); + message.setIntProperty("select", 5); + assertTrue("Selector didnt match when expected", simpleSelectorFilter.matches(message)); + message.setIntProperty("select", 6); + assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message)); + } + + public void testFailedMatchingFilter() throws Exception + { + MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select>4"); + + assertNotNull("Filter object is null", simpleSelectorFilter); + assertNotNull("Selector string is null", simpleSelectorFilter.getSelector()); + assertEquals("Unexpected selector", "select>4", simpleSelectorFilter.getSelector()); + assertTrue("Filter object is invalid", simpleSelectorFilter != null); + + final JMSTextMessage message = TestMessageHelper.newJMSTextMessage(); + + message.setStringProperty("select", "5"); + assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message)); + message.setStringProperty("select", "elephant"); + assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message)); + message.setBooleanProperty("select", false); + assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message)); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java index b5e7ae82b5..cd18b5181f 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java @@ -47,12 +47,17 @@ public class MessageConverterTest extends TestCase protected JMSTextMessage testTextMessage; protected JMSMapMessage testMapMessage; - private AMQSession _session = new TestAMQSession(); + private AMQConnection _connection; + private AMQSession _session; protected void setUp() throws Exception { super.setUp(); + + _connection = new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'"); + _session = new TestAMQSession(_connection); + testTextMessage = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8); //Set Message Text diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 6759b43387..06d0f4a3f9 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -29,22 +29,25 @@ import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.BasicMessageConsumer_0_8; import org.apache.qpid.client.BasicMessageProducer_0_8; +import org.apache.qpid.client.MockAMQConnection; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; public class TestAMQSession extends AMQSession { - public TestAMQSession() + public TestAMQSession(AMQConnection connection) { - super(null, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0); + super(connection, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0); } public void acknowledgeMessage(long deliveryTag, boolean multiple) @@ -124,7 +127,7 @@ public class TestAMQSession extends AMQSession - + diff --git a/qpid/java/common/src/main/java/common.bnd b/qpid/java/common/src/main/java/common.bnd index f12fbf9273..64e80c9b43 100755 --- a/qpid/java/common/src/main/java/common.bnd +++ b/qpid/java/common/src/main/java/common.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.13.0 +ver: 0.15.0 Bundle-SymbolicName: qpid-common Bundle-Version: ${ver} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java new file mode 100644 index 0000000000..aa262bdde5 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.common; + +/** + * Keys names used within the serverProperties argument of the ConnectionStart + * method. These property names are Qpid specific. + */ +public final class ServerPropertyNames +{ + /** + * Server property: federation tag UUID + */ + public static final String FEDERATION_TAG = "qpid.federation_tag"; + + /** + * Server property: array of features supported by the server. + */ + public static final String QPID_FEATURES = "qpid.features"; + + /** + * Feature: Signifies that a server supports JMS selectors. + */ + public static final String FEATURE_QPID_JMS_SELECTOR = "qpid.jms-selector"; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 721c821bab..4a126b8504 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -832,9 +832,12 @@ public class FieldTable public void addAll(FieldTable fieldTable) { initMapIfNecessary(); - _encodedForm = null; - _properties.putAll(fieldTable._properties); - recalculateEncodedSize(); + if (fieldTable._properties != null) + { + _encodedForm = null; + _properties.putAll(fieldTable._properties); + recalculateEncodedSize(); + } } public static Map convertToMap(final FieldTable fieldTable) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 1c521244d0..b78433052c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -526,10 +526,6 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - for (Session ssn : channels.values()) - { - ssn.closeCode(close); - } ConnectionCloseCode code = close.getReplyCode(); if (code != ConnectionCloseCode.NORMAL) { @@ -701,8 +697,17 @@ public class Connection extends ConnectionInvoker return channels.values(); } - public boolean hasSessionWithName(final String name) + public boolean hasSessionWithName(final byte[] name) { - return sessions.containsKey(new Binary(name.getBytes())); + return sessions.containsKey(new Binary(name)); + } + + public void notifyFailoverRequired() + { + List values = new ArrayList(channels.values()); + for (Session ssn : values) + { + ssn.notifyFailoverRequired(); + } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 82fa6ca473..07d21c9904 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -194,11 +194,18 @@ public class ServerDelegate extends ConnectionDelegate @Override public void sessionAttach(Connection conn, SessionAttach atc) + { + sessionAttachImpl(conn, atc); + } + + protected Session sessionAttachImpl(Connection conn, SessionAttach atc) { Session ssn = getSession(conn, atc); conn.map(ssn, atc.getChannel()); ssn.sessionAttached(atc.getName()); ssn.setState(Session.State.OPEN); + + return ssn; } protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 0de558d152..321e5256b2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Session @@ -125,6 +126,8 @@ public class Session extends SessionInvoker private SessionDetachCode detachCode; private final Object stateLock = new Object(); + private final AtomicBoolean _failoverRequired = new AtomicBoolean(false); + protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); @@ -257,6 +260,7 @@ public class Session extends SessionInvoker void resume() { + _failoverRequired.set(false); synchronized (commands) { attach(); @@ -459,7 +463,7 @@ public class Session extends SessionInvoker synchronized (commands) { - if (state == DETACHED || state == CLOSING) + if (state == DETACHED || state == CLOSING || state == CLOSED) { return; } @@ -583,30 +587,25 @@ public class Session extends SessionInvoker synchronized (commands) { - //allow the txSelect operation to be invoked during resume - boolean skipWait = m instanceof TxSelect && state == RESUMING; - - if(!skipWait) + if (state == DETACHED && m.isUnreliable()) { - if (state == DETACHED && m.isUnreliable()) + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) { - Thread current = Thread.currentThread(); - if (!current.equals(resumer)) - { - return; - } + return; } + } - if (state != OPEN && state != CLOSED && state != CLOSING) + if (state != OPEN && state != CLOSED && state != CLOSING) + { + Thread current = Thread.currentThread(); + if (!current.equals(resumer) ) { - Thread current = Thread.currentThread(); - if (!current.equals(resumer)) + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && (state != OPEN && state != CLOSED)) { - Waiter w = new Waiter(commands, timeout); - while (w.hasTime() && (state != OPEN && state != CLOSED)) - { - w.await(); - } + checkFailoverRequired("Command was interrupted because of failover, before being sent"); + w.await(); } } } @@ -674,6 +673,7 @@ public class Session extends SessionInvoker } } } + checkFailoverRequired("Command was interrupted because of failover, before being sent"); w.await(); } } @@ -768,6 +768,14 @@ public class Session extends SessionInvoker } } + private void checkFailoverRequired(String message) + { + if (_failoverRequired.get()) + { + throw new SessionException(message); + } + } + protected boolean shouldIssueFlush(int next) { return (next % 65536) == 0; @@ -793,6 +801,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { + checkFailoverRequired("Session sync was interrupted by failover."); log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); w.await(); } @@ -853,13 +862,6 @@ public class Session extends SessionInvoker } } - private ConnectionClose close = null; - - void closeCode(ConnectionClose close) - { - this.close = close; - } - ExecutionException getException() { synchronized (results) @@ -910,6 +912,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(this, timeout); while (w.hasTime() && state != CLOSED && !isDone()) { + checkFailoverRequired("Operation was interrupted by failover."); log.debug("%s waiting for result: %s", Session.this, this); w.await(); } @@ -921,7 +924,12 @@ public class Session extends SessionInvoker } else if (state == CLOSED) { - throw new SessionException(getException()); + ExecutionException ex = getException(); + if(ex == null) + { + throw new SessionClosedException(); + } + throw new SessionException(ex); } else { @@ -1001,6 +1009,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED) { + checkFailoverRequired("close() was interrupted by failover."); w.await(); } @@ -1095,6 +1104,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(stateLock, timeout); while (w.hasTime() && state == NEW) { + checkFailoverRequired("Session opening was interrupted by failover."); w.await(); } } @@ -1117,4 +1127,26 @@ public class Session extends SessionInvoker { return stateLock; } + + protected void notifyFailoverRequired() + { + //ensure any operations waiting are aborted to + //prevent them waiting for timeout for 60 seconds + //and possibly preventing failover proceeding + _failoverRequired.set(true); + synchronized (commands) + { + commands.notifyAll(); + } + synchronized (results) + { + for (ResultFuture result : results.values()) + { + synchronized(result) + { + result.notifyAll(); + } + } + } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java index a6a8b8beb4..fe1a300479 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -257,4 +257,19 @@ public final class Strings return join(sep, Arrays.asList(items)); } + public static String printMap(Map map) + { + StringBuilder sb = new StringBuilder(); + sb.append("<"); + if (map != null) + { + for(String key : map.keySet()) + { + sb.append(key).append(" = ").append(map.get(key)).append(" "); + } + } + sb.append(">"); + return sb.toString(); + } + } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java index bb4c9c3884..bd189feb1c 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -20,17 +20,19 @@ */ package org.apache.qpid.framing; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQPInvalidClassException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; - public class PropertyFieldTableTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(PropertyFieldTableTest.class); @@ -106,7 +108,7 @@ public class PropertyFieldTableTest extends TestCase table1.setByte("value", Byte.MAX_VALUE); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getByte("value")); @@ -139,7 +141,7 @@ public class PropertyFieldTableTest extends TestCase table1.setShort("value", Short.MAX_VALUE); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -172,7 +174,7 @@ public class PropertyFieldTableTest extends TestCase table1.setChar("value", 'c'); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -206,7 +208,7 @@ public class PropertyFieldTableTest extends TestCase table1.setDouble("value", Double.MAX_VALUE); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -241,7 +243,7 @@ public class PropertyFieldTableTest extends TestCase table1.setFloat("value", Float.MAX_VALUE); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Tests lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -404,7 +406,7 @@ public class PropertyFieldTableTest extends TestCase table1.setString("value", "Hello"); Assert.assertTrue(table1.propertyExists("value")); - // Tets lookups we shouldn't get anything back for other gets + // Test lookups we shouldn't get anything back for other gets // we should get right value back for this type .... Assert.assertEquals(null, table1.getBoolean("value")); Assert.assertEquals(null, table1.getByte("value")); @@ -569,7 +571,7 @@ public class PropertyFieldTableTest extends TestCase Assert.assertEquals("Hello", table.getObject("object-string")); } - public void testwriteBuffer() throws IOException + public void testWriteBuffer() throws IOException { byte[] bytes = { 99, 98, 97, 96, 95 }; @@ -950,6 +952,36 @@ public class PropertyFieldTableTest extends TestCase } + public void testAddAll() + { + final FieldTable table1 = new FieldTable(); + table1.setInteger("int1", 1); + table1.setInteger("int2", 2); + assertEquals("Unexpected number of entries in table1", 2, table1.size()); + + final FieldTable table2 = new FieldTable(); + table2.setInteger("int3", 3); + table2.setInteger("int4", 4); + assertEquals("Unexpected number of entries in table2", 2, table2.size()); + + table1.addAll(table2); + assertEquals("Unexpected number of entries in table1 after addAll", 4, table1.size()); + assertEquals(Integer.valueOf(3), table1.getInteger("int3")); + } + + public void testAddAllWithEmptyFieldTable() + { + final FieldTable table1 = new FieldTable(); + table1.setInteger("int1", 1); + table1.setInteger("int2", 2); + assertEquals("Unexpected number of entries in table1", 2, table1.size()); + + final FieldTable emptyFieldTable = new FieldTable(); + + table1.addAll(emptyFieldTable); + assertEquals("Unexpected number of entries in table1 after addAll", 2, table1.size()); + } + private void assertBytesEqual(byte[] expected, byte[] actual) { Assert.assertEquals(expected.length, actual.length); diff --git a/qpid/java/ivy.xml b/qpid/java/ivy.xml index 1399db5248..899c5fa79b 100644 --- a/qpid/java/ivy.xml +++ b/qpid/java/ivy.xml @@ -18,7 +18,7 @@ - + @@ -33,6 +33,24 @@ + + + + + + + + + + + + + + + + + + diff --git a/qpid/java/lib/poms/je-4.0.117.xml b/qpid/java/lib/poms/je-4.0.117.xml new file mode 100644 index 0000000000..85573a99f9 --- /dev/null +++ b/qpid/java/lib/poms/je-4.0.117.xml @@ -0,0 +1,22 @@ + + + + com.sleepycat + je + 4.0.117 + diff --git a/qpid/java/management/common/build.xml b/qpid/java/management/common/build.xml index ce2ec3a106..1a0520b1e3 100644 --- a/qpid/java/management/common/build.xml +++ b/qpid/java/management/common/build.xml @@ -19,8 +19,9 @@ - --> - + + diff --git a/qpid/java/management/common/src/main/java/management-common.bnd b/qpid/java/management/common/src/main/java/management-common.bnd index 5c39329f2f..5f7f8bede3 100644 --- a/qpid/java/management/common/src/main/java/management-common.bnd +++ b/qpid/java/management/common/src/main/java/management-common.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.13.0 +ver: 0.15.0 Bundle-SymbolicName: qpid-management-common Bundle-Version: ${ver} diff --git a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF index 0df308dff1..f47bc86797 100644 --- a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF +++ b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF @@ -3,7 +3,7 @@ Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt Bundle-ManifestVersion: 2 Bundle-Name: Qpid JMX Management Console Plug-in Bundle-SymbolicName: org.apache.qpid.management.ui; singleton:=true -Bundle-Version: 0.13.0 +Bundle-Version: 0.15.0 Bundle-Activator: org.apache.qpid.management.ui.Activator Bundle-Vendor: Apache Software Foundation Bundle-Localization: plugin diff --git a/qpid/java/systests/etc/config-systests-firewall-2.xml b/qpid/java/systests/etc/config-systests-firewall-2.xml index 2549a7e6c4..a9fd86b8e5 100644 --- a/qpid/java/systests/etc/config-systests-firewall-2.xml +++ b/qpid/java/systests/etc/config-systests-firewall-2.xml @@ -32,8 +32,8 @@ false false - /path/to/keystore.ks - keystorepass + /path/to/keystore.ks + keystorepass 5672 8672 diff --git a/qpid/java/systests/etc/config-systests-firewall-3.xml b/qpid/java/systests/etc/config-systests-firewall-3.xml index 0cafb6d70a..f0f3423f43 100644 --- a/qpid/java/systests/etc/config-systests-firewall-3.xml +++ b/qpid/java/systests/etc/config-systests-firewall-3.xml @@ -28,12 +28,12 @@ + To disable Non-SSL port set sslOnly to true --> false false - /path/to/keystore.ks - keystorepass + /path/to/keystore.ks + keystorepass 5672 8672 diff --git a/qpid/java/systests/etc/config-systests-settings.xml b/qpid/java/systests/etc/config-systests-settings.xml index 5ed208bfe7..88533400d3 100644 --- a/qpid/java/systests/etc/config-systests-settings.xml +++ b/qpid/java/systests/etc/config-systests-settings.xml @@ -25,8 +25,8 @@ 15671 false false - ${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks - password + ${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks + password diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java index 986297bfe1..7ea4416f3b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java @@ -35,13 +35,11 @@ import org.slf4j.LoggerFactory; public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase { - - private static final int NUM_MESSAGES = 1000; - private Connection con; private Session session; private AMQQueue queue; private MessageConsumer consumer; + private int numMessages; private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class); @@ -87,6 +85,8 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase { super.setUp(); + numMessages = isBrokerStorePersistent() ? 300 : 1000; + _logger.info("Create Connection"); con = getConnection(); _logger.info("Create Session"); @@ -105,19 +105,19 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase // Setup initial messages _logger.info("Creating first producer thread"); - producerThread = new ASyncProducer(queue, 0, NUM_MESSAGES / 2); + producerThread = new ASyncProducer(queue, 0, numMessages / 2); producerThread.start(); // Wait for them to be done producerThread.join(); // Setup second set of messages to produce while we consume _logger.info("Creating second producer thread"); - producerThread = new ASyncProducer(queue, NUM_MESSAGES / 2, NUM_MESSAGES); + producerThread = new ASyncProducer(queue, numMessages / 2, numMessages); producerThread.start(); // Start consuming and checking they're in order _logger.info("Consuming messages"); - for (int i = 0; i < NUM_MESSAGES; i++) + for (int i = 0; i < numMessages; i++) { Message msg = consumer.receive(3000); assertNotNull("Message should not be null", msg); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java index 303da29389..40a0d32b01 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java @@ -135,7 +135,7 @@ public class ResetMessageListenerTest extends QpidBrokerTestCase try { assertTrue("Did not receive all first batch of messages", - _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS)); + _allFirstMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS)); _logger.info("Received first batch of messages"); } catch (InterruptedException e) @@ -212,7 +212,7 @@ public class ResetMessageListenerTest extends QpidBrokerTestCase try { - assertTrue(_allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(_allSecondMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS)); } catch (InterruptedException e) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java new file mode 100644 index 0000000000..980fc7285d --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java @@ -0,0 +1,14 @@ +package org.apache.qpid.client.failover; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +public class AddressBasedFailoverBehaviourTest extends FailoverBehaviourTest +{ + @Override + protected Destination createDestination(Session session) throws JMSException + { + return session.createQueue("ADDR:" +getTestQueueName() + "_" + System.currentTimeMillis() + "; {create: always}"); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index 66f8fe0546..a5b9c618bc 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -20,6 +20,7 @@ package org.apache.qpid.client.failover; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Enumeration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -33,12 +34,15 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.test.utils.FailoverBaseCase; /** @@ -96,6 +100,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio */ private JMSException _exceptionListenerException; + /** + * Latch to check that failover mutex is hold by a failover thread + */ + private CountDownLatch _failoverStarted; + @Override protected void setUp() throws Exception { @@ -105,6 +114,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio _connection.setExceptionListener(this); ((AMQConnection) _connection).setConnectionListener(this); _failoverComplete = new CountDownLatch(1); + _failoverStarted = new CountDownLatch(1); } /** @@ -625,8 +635,134 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE); } + public void testPublishAutoAcknowledgedWhileFailover() throws Exception + { + publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE); + } + + public void testPublishClientAcknowledgedWhileFailover() throws Exception + { + Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE); + receivedMessage.acknowledge(); + } + + public void testPublishTransactedAcknowledgedWhileFailover() throws Exception + { + publishWhileFailingOver(Session.SESSION_TRANSACTED); + _consumerSession.commit(); + } + + public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception + { + publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE); + } + + public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception + { + publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE); + + } + + public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception + { + publishWithFailoverMutex(Session.SESSION_TRANSACTED); + } + + public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception + { + sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE); + } + + public void testTransactedSessionCloseWhileFailover() throws Exception + { + sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED); + } + + public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception + { + sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE); + } + + public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception + { + browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE); + } + + public void testTransactedQueueBrowserCloseWhileFailover() throws Exception + { + browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED); + } + + public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception + { + browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE); + } + + private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException + { + setDelayedFailoverPolicy(5); + init(autoAcknowledge, true); + + String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0); + Message message = _producerSession.createTextMessage(text); + + failBroker(getFailingPort()); + + if(!_failoverStarted.await(5, TimeUnit.SECONDS)) + { + fail("Did not receieve notification failover had started"); + } + + _producer.send(message); + + if (_producerSession.getTransacted()) + { + _producerSession.commit(); + } + + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + return receivedMessage; + } + + private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException + { + setDelayedFailoverPolicy(5); + init(autoAcknowledge, true); + + String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0); + Message message = _producerSession.createTextMessage(text); + + AMQConnection connection = (AMQConnection)_connection; + + // holding failover mutex should prevent the failover from + // proceeding before we try to send the message + synchronized(connection.getFailoverMutex()) + { + failBroker(getFailingPort()); + + // wait to make sure that connection is lost + while(!connection.isFailingOver()) + { + Thread.sleep(25l); + } + + try + { + _producer.send(message); + fail("Sending should fail because connection was lost and failover has not yet completed"); + } + catch(JMSException e) + { + // JMSException is expected + } + } + // wait for failover completion, thus ensuring it actually + //got started, before allowing the test to tear down + awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); + } /** - * Tests {@link Session#close()} for session with given acknowledge mode + * Tests {@link Session#close()} for session with given acknowledge mode * to ensure that close works after failover. * * @param acknowledgeMode session acknowledge mode @@ -671,7 +807,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false; _consumerSession = _connection.createSession(isTransacted, acknowledgeMode); - _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis()); + _destination = createDestination(_consumerSession); _consumer = _consumerSession.createConsumer(_destination); if (startConnection) @@ -684,6 +820,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio } + protected Destination createDestination(Session session) throws JMSException + { + return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis()); + } + /** * Resends messages if reconnected to a non-clustered broker * @@ -879,6 +1020,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio @Override public boolean preFailover(boolean redirect) { + _failoverStarted.countDown(); return true; } @@ -900,6 +1042,39 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio _exceptionListenerException = e; } + /** + * Causes 1 second delay before reconnect in order to test whether JMS + * methods block while failover is in progress + */ + private static class DelayingFailoverPolicy extends FailoverPolicy + { + + private CountDownLatch _suspendLatch; + private long _delay; + + public DelayingFailoverPolicy(AMQConnection connection, long delay) + { + super(connection.getConnectionURL(), connection); + _suspendLatch = new CountDownLatch(1); + _delay = delay; + } + + public void attainedConnection() + { + try + { + _suspendLatch.await(_delay, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + // continue + } + super.attainedConnection(); + } + + } + + private class FailoverTestMessageListener implements MessageListener { // message counter @@ -946,4 +1121,101 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio return _counter.get(); } } + + /** + * Tests {@link Session#close()} for session with given acknowledge mode + * to ensure that it blocks until failover implementation restores connection. + * + * @param acknowledgeMode session acknowledge mode + * @throws JMSException + */ + private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + { + initDelayedFailover(acknowledgeMode); + + // intentionally receive message but not commit or acknowledge it in + // case of transacted or CLIENT_ACK session + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + failBroker(getFailingPort()); + + // test whether session#close blocks while failover is in progress + _consumerSession.close(); + + assertFailoverException(); + } + + /** + * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing. + * + * @param acknowledgeMode session acknowledge mode + * @return queue browser + * @throws JMSException + */ + private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException + { + init(acknowledgeMode, false); + _consumer.close(); + QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination); + _connection.start(); + + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + _producerSession.commit(); + } + return browser; + } + + /** + * Tests {@link QueueBrowser#close()} for session with given acknowledge mode + * to ensure that it blocks until failover implementation restores connection. + * + * @param acknowledgeMode session acknowledge mode + * @throws JMSException + */ + private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + { + setDelayedFailoverPolicy(); + + QueueBrowser browser = prepareQueueBrowser(acknowledgeMode); + + @SuppressWarnings("unchecked") + Enumeration messages = browser.getEnumeration(); + Message receivedMessage = (Message) messages.nextElement(); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + failBroker(getFailingPort()); + + browser.close(); + + assertFailoverException(); + } + + private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException + { + DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy(); + init(acknowledgeMode, true); + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + _producerSession.commit(); + } + return failoverPolicy; + } + + private DelayingFailoverPolicy setDelayedFailoverPolicy() + { + return setDelayedFailoverPolicy(2); + } + + private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay) + { + AMQConnection amqConnection = (AMQConnection) _connection; + DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay); + ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy); + return failoverPolicy; + } + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java new file mode 100644 index 0000000000..c0b07f239b --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java @@ -0,0 +1,116 @@ +package org.apache.qpid.client.prefetch; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PrefetchBehaviourTest extends QpidBrokerTestCase +{ + protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class); + private Connection _normalConnection; + private AtomicBoolean _exceptionCaught; + private CountDownLatch _processingStarted; + private CountDownLatch _processingCompleted; + + protected void setUp() throws Exception + { + super.setUp(); + _normalConnection = getConnection(); + _exceptionCaught = new AtomicBoolean(); + _processingStarted = new CountDownLatch(1); + _processingCompleted = new CountDownLatch(1); + } + + /** + * Verifies that a slow processing asynchronous transacted consumer with prefetch=1 only + * gets 1 of the messages sent, with the second consumer picking up the others while the + * slow consumer is processing, i.e that prefetch=1 actually does what it says on the tin. + */ + public void testPrefetchOneWithAsynchronousTransactedConsumer() throws Exception + { + final long processingTime = 5000; + + //create a second connection with prefetch set to 1 + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection prefetch1Connection = getConnection(); + + prefetch1Connection.start(); + _normalConnection.start(); + + //create an asynchronous consumer with simulated slow processing + final Session prefetch1session = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = prefetch1session.createQueue(getTestQueueName()); + MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue); + prefetch1consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + try + { + _logger.debug("starting processing"); + _processingStarted.countDown(); + _logger.debug("processing started"); + + //simulate message processing + Thread.sleep(processingTime); + + prefetch1session.commit(); + + _processingCompleted.countDown(); + } + catch(Exception e) + { + _logger.error("Exception caught in message listener"); + _exceptionCaught.set(true); + } + } + }); + + //create producer and send 5 messages + Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = producerSession.createProducer(queue); + + for (int i = 0; i < 5; i++) + { + producer.send(producerSession.createTextMessage("test")); + } + producerSession.commit(); + + //wait for the first message to start being processed by the async consumer + assertTrue("Async processing failed to start in allowed timeframe", _processingStarted.await(2000, TimeUnit.MILLISECONDS)); + _logger.debug("proceeding with test"); + + //try to consumer the other messages with another consumer while the async procesisng occurs + Session normalSession = _normalConnection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer normalConsumer = normalSession.createConsumer(queue); + + Message msg; + // Check that other consumer gets the other 4 messages + for (int i = 0; i < 4; i++) + { + msg = normalConsumer.receive(1500); + assertNotNull("Consumer should receive 4 messages",msg); + } + msg = normalConsumer.receive(250); + assertNull("Consumer should not have received a 5th message",msg); + + //wait for the other consumer to finish to ensure it completes ok + _logger.debug("waiting for async consumer to complete"); + assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS)); + assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get()); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java index 460270e188..277e84d66d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java @@ -122,7 +122,8 @@ public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase producer2.start(); //await delivery of the messages - boolean result = _receivedLatch.await(75, TimeUnit.SECONDS); + int timeout = isBrokerStorePersistent() ? 300 : 75; + boolean result = _receivedLatch.await(timeout, TimeUnit.SECONDS); assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg); assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(), diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java index 74f50e8659..aeeecb2dff 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java @@ -110,18 +110,13 @@ public class QueueDepthWithSelectorTest extends QpidBrokerTestCase try { Connection connection = getConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQSession session = (AMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Thread.sleep(2000); - long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue); + long queueDepth = session.getQueueDepth((AMQDestination) _queue); assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth); connection.close(); } - catch (InterruptedException e) - { - fail(e.getMessage()); - } catch (AMQException e) { fail(e.getMessage()); @@ -158,6 +153,10 @@ public class QueueDepthWithSelectorTest extends QpidBrokerTestCase { assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]); } + + //do a synchronous op to ensure the acks are processed + //on the broker before proceeding + ((AMQSession)_clientSession).sync(); } /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java index 13a9dd73b8..107c730a7e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java @@ -94,7 +94,7 @@ public class CancelTest extends QpidBrokerTestCase browser.close(); MessageConsumer consumer = _clientSession.createConsumer(_queue); - assertNotNull( consumer.receive() ); + assertNotNull( consumer.receive(2000l) ); consumer.close(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index d6caf05d33..4eb328f091 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -261,8 +261,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount, String selector) throws JMSException { QueueBrowser queueBrowser = selector == null ? - _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue); -// _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector); + _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector); Enumeration[] msgs = new Enumeration[browserEnumerationCount]; int[] msgCount = new int[browserEnumerationCount]; @@ -347,7 +346,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase protected void checkQueueDepthWithSelectors(int totalMessages, int clients) throws JMSException { - String selector = MESSAGE_ID_PROPERTY + " % " + clients; + String selector = MESSAGE_ID_PROPERTY + " % " + clients + " = 0" ; checkOverlappingMultipleGetEnum(totalMessages / clients, clients, selector); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 8c3c247e2b..feae7c9573 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.test.client.destination; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,11 +18,13 @@ package org.apache.qpid.test.client.destination; * under the License. * */ - +package org.apache.qpid.test.client.destination; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; import java.util.Hashtable; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -34,6 +35,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; @@ -475,13 +477,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { prod.send(jmsSession.createTextMessage("msg" + i) ); } - - for (int i=0; i< 9; i++) + Message msg = null; + for (int i=0; i< 10; i++) { - cons.receive(); + msg = cons.receive(RECEIVE_TIMEOUT); + assertNotNull("Should have received " + i + " message", msg); + assertEquals("Unexpected message received", "msg" + i, ((TextMessage)msg).getText()); } - Message msg = cons.receive(RECEIVE_TIMEOUT); - assertNotNull("Should have received the 10th message",msg); assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT)); msg.acknowledge(); for (int i=11; i<16; i++) @@ -1068,19 +1070,6 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported")); } - - String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}"; - try - { - AMQAnyDestination dest = new AMQAnyDestination(addr4); - Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons = ssn.createConsumer(dest); - fail("An exception should be thrown indicating it's an unsupported combination"); - } - catch(Exception e) - { - assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics")); - } } private void acceptModeTest(String address, int expectedQueueDepth) throws Exception @@ -1182,4 +1171,154 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000)); cons.close(); } + + public void testQueueBrowserWithSelectorAutoAcknowledgement() throws Exception + { + assertQueueBrowserWithSelector(Session.AUTO_ACKNOWLEDGE); + } + + public void testQueueBrowserWithSelectorClientAcknowldgement() throws Exception + { + assertQueueBrowserWithSelector(Session.CLIENT_ACKNOWLEDGE); + } + + public void testQueueBrowserWithSelectorTransactedSession() throws Exception + { + assertQueueBrowserWithSelector(Session.SESSION_TRANSACTED); + } + + public void testConsumerWithSelectorAutoAcknowledgement() throws Exception + { + assertConsumerWithSelector(Session.AUTO_ACKNOWLEDGE); + } + + public void testConsumerWithSelectorClientAcknowldgement() throws Exception + { + assertConsumerWithSelector(Session.CLIENT_ACKNOWLEDGE); + } + + public void testConsumerWithSelectorTransactedSession() throws Exception + { + assertConsumerWithSelector(Session.SESSION_TRANSACTED); + } + + private void assertQueueBrowserWithSelector(int acknowledgement) throws Exception + { + String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}"; + + boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; + Session session = _connection.createSession(transacted, acknowledgement); + + Queue queue = session.createQueue(queueAddress); + + final int numberOfMessages = 10; + List sentMessages = sendMessage(session, queue, numberOfMessages); + assertNotNull("Messages were not sent", sentMessages); + assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size()); + + QueueBrowser browser = session.createBrowser(queue, INDEX + "%2=0"); + _connection.start(); + + Enumeration enumaration = browser.getEnumeration(); + + int counter = 0; + int expectedIndex = 0; + while (enumaration.hasMoreElements()) + { + Message m = enumaration.nextElement(); + assertNotNull("Expected not null message at step " + counter, m); + int messageIndex = m.getIntProperty(INDEX); + assertEquals("Unexpected index", expectedIndex, messageIndex); + expectedIndex += 2; + counter++; + } + assertEquals("Unexpected number of messsages received", 5, counter); + } + + private void assertConsumerWithSelector(int acknowledgement) throws Exception + { + String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}"; + + boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; + Session session = _connection.createSession(transacted, acknowledgement); + + Queue queue = session.createQueue(queueAddress); + + final int numberOfMessages = 10; + List sentMessages = sendMessage(session, queue, numberOfMessages); + assertNotNull("Messages were not sent", sentMessages); + assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size()); + + MessageConsumer consumer = session.createConsumer(queue, INDEX + "%2=0"); + + int expectedIndex = 0; + for (int i = 0; i < 5; i++) + { + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Expected not null message at step " + i, m); + int messageIndex = m.getIntProperty(INDEX); + assertEquals("Unexpected index", expectedIndex, messageIndex); + expectedIndex += 2; + + if (transacted) + { + session.commit(); + } + else if (acknowledgement == Session.CLIENT_ACKNOWLEDGE) + { + m.acknowledge(); + } + } + + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNull("Unexpected message received", m); + } + + /** + * Tests that a client using a session in {@link Session#CLIENT_ACKNOWLEDGE} can correctly + * recover a session and re-receive the same message. + */ + public void testTopicRereceiveAfterRecover() throws Exception + { + final Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); + final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}"); + + final MessageProducer prod = jmsSession.createProducer(topic); + final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic); + final Message sentMessage = jmsSession.createTextMessage("Hello"); + + prod.send(sentMessage); + Message receivedMessage = consForTopic1.receive(1000); + assertNotNull("message should be received by consumer", receivedMessage); + + jmsSession.recover(); + receivedMessage = consForTopic1.receive(1000); + assertNotNull("message should be re-received by consumer after recover", receivedMessage); + receivedMessage.acknowledge(); + } + + /** + * Tests that a client using a session in {@link Session#SESSION_TRANSACTED} can correctly + * rollback a session and re-receive the same message. + */ + public void testTopicRereceiveAfterRollback() throws Exception + { + final Session jmsSession = _connection.createSession(true,Session.SESSION_TRANSACTED); + final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}"); + + final MessageProducer prod = jmsSession.createProducer(topic); + final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic); + final Message sentMessage = jmsSession.createTextMessage("Hello"); + + prod.send(sentMessage); + jmsSession.commit(); + + Message receivedMessage = consForTopic1.receive(1000); + assertNotNull("message should be received by consumer", receivedMessage); + + jmsSession.rollback(); + receivedMessage = consForTopic1.receive(1000); + assertNotNull("message should be re-received by consumer after rollback", receivedMessage); + jmsSession.commit(); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index 53a7533869..57ff6a4fa2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -205,7 +205,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumerA = consSessA.createConsumer(_queue); - Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = producerSession.createProducer(_queue); // Send 3 messages @@ -213,6 +213,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase { producer.send(producerSession.createTextMessage("test")); } + producerSession.commit(); MessageConsumer consumerB = null; // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index d799b141c0..9ea116ae53 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -262,7 +262,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase producer.send(session1.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); - msg = consumer1.receive(500); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Consumer 1 should get message 'B'.", msg); assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); @@ -287,13 +287,13 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase else { _logger.info("Receive message on consumer 3 :expecting B"); - msg = consumer3.receive(500); + msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Consumer 3 should get message 'B'.", msg); assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText()); } _logger.info("Receive message on consumer 1 :expecting C"); - msg = consumer1.receive(500); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Consumer 1 should get message 'C'.", msg); assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); @@ -301,7 +301,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase assertNull("There should be no more messages for consumption on consumer1.", msg); _logger.info("Receive message on consumer 3 :expecting C"); - msg = consumer3.receive(500); + msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Consumer 3 should get message 'C'.", msg); assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); @@ -358,7 +358,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase // Send message and check that both consumers get it and only it. producer.send(session0.createTextMessage("A")); - msg = consumer1.receive(500); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should be available", msg); assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText()); msg = consumer1.receive(500); @@ -729,7 +729,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase conn.start(); - Message rMsg = subB.receive(1000); + Message rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart2", @@ -797,7 +797,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase conn.start(); - Message rMsg = subTwo.receive(1000); + Message rMsg = subTwo.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart1", diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 0b1aeef8e9..826545a23d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -20,17 +20,22 @@ */ package org.apache.qpid.test.unit.topic; +import javax.jms.Connection; import javax.jms.InvalidDestinationException; +import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQTopicSessionAdaptor; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -306,51 +311,39 @@ public class TopicSessionTest extends QpidBrokerTestCase } /** - * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber - * due to a selector can be leaked. - * @throws Exception + * This tests was added to demonstrate QPID-3542. The Java Client when used with the CPP Broker was failing to + * ack messages received that did not match the selector. This meant the messages remained indefinitely on the Broker. */ - public void testNonMatchingMessagesDoNotFillQueue() throws Exception + public void testNonMatchingMessagesHandledCorrectly() throws Exception { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - // Setup Topic - AMQTopic topic = new AMQTopic(con, "testNoLocal"); - - TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); + final String topicName = getName(); + final String clientId = "clientId" + topicName; + final Connection con1 = getConnection(); + final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Topic topic1 = session1.createTopic(topicName); + final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId); // Setup subscriber with selector - TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false); - TopicPublisher publisher = session.createPublisher(topic); + final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false); + final MessageProducer publisher = session1.createProducer(topic1); - con.start(); - TextMessage m; - TextMessage message; + con1.start(); // Send non-matching message - message = session.createTextMessage("non-matching 1"); - publisher.publish(message); - session.commit(); + final Message sentMessage = session1.createTextMessage("hello"); + sentMessage.setStringProperty("Selector", "nonMatch"); + publisher.send(sentMessage); - // Send and consume matching message - message = session.createTextMessage("hello"); - message.setStringProperty("Selector", "select"); - - publisher.publish(message); - session.commit(); - - m = (TextMessage) selector.receive(1000); - assertNotNull("should have received message", m); - assertEquals("Message contents were wrong", "hello", m.getText()); - - // Send non-matching message - message = session.createTextMessage("non-matching 2"); - publisher.publish(message); - session.commit(); + // Try to consume non-message, expect this to fail. + final Message message1 = subscriberWithSelector.receive(1000); + assertNull("should not have received message", message1); + subscriberWithSelector.close(); - // Assert queue count is 0 - long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic); - assertEquals("Queue depth was wrong", 0, depth); + session1.close(); + // Now verify queue depth on broker. + final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE); + final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker); + assertEquals("Expected queue depth of zero", 0, depth); } } diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index d05f42e4b7..39b4d542db 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -64,9 +64,6 @@ org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges // 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection -// c++ broker doesn't do selectors, so this will fail -org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue - // InVM Broker tests org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#* diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index 8344cd98c2..68feaf1e2b 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -28,6 +28,7 @@ org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange // The new addressing based syntax is not supported for AMQP 0-8/0-9 versions +org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#* org.apache.qpid.test.client.destination.AddressBasedDestinationTest#* org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy diff --git a/qpid/java/test-profiles/python_tests/Java010PythonExcludes b/qpid/java/test-profiles/python_tests/Java010PythonExcludes index 31d2a8affc..10e6298634 100644 --- a/qpid/java/test-profiles/python_tests/Java010PythonExcludes +++ b/qpid/java/test-profiles/python_tests/Java010PythonExcludes @@ -1,22 +1,115 @@ -// -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// - -// QPID-3477: Java broker does not handle rejection code specified in test +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +###### Feature not supported in Java Broker ###### + +#The broker does not support DTX +qpid_tests.broker_0_10.dtx.* + +#The broker does not support message groups +qpid_tests.broker_0_10.msg_groups.* + +#The broker does not have the appropriate QMF support +qpid_tests.broker_0_10.management.* + +#The broker does not use the same threshold alerting system (or the QMF support needed for the tests) +qpid_tests.broker_0_10.threshold.* + +#The broker does not support the policy extension +qpid_tests.broker_0_10.extensions.ExtensionTests.test_policy_* + +#The broker does not support the timed-autodelete extension +qpid_tests.broker_0_10.extensions.ExtensionTests.test_timed_autodelete + +#The broker does not support ring queues, fairshare, or the priority alias +qpid_tests.broker_0_10.priority.PriorityTests.test_ring_queue* +qpid_tests.broker_0_10.priority.PriorityTests.test_fairshare* +qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_with_alias + + +###### Behavioural differences between Java & CPP Broker ###### + +#QPID-3587 Java broker does not alter queue counts until msgs are accepted. +qpid_tests.broker_0_10.message.MessageTests.test_ack +qpid_tests.broker_0_10.message.MessageTests.test_acquire +qpid_tests.broker_0_10.message.MessageTests.test_acquire_with_no_accept_and_credit_flow + +#QPID-3588 Java broker sets expiration and doesnt pass TTL on to consumer +qpid.tests.messaging.message.MessageEchoTests.testProperties + +#QPID-3589 Difference in exception text message causes test to fail +qpid.tests.messaging.endpoints.AddressTests.testDeleteSpecial + +#QPID-3590 Java broker does not support null value for routing key +qpid.tests.messaging.endpoints.SessionTests.testDoubleCommit + + +###### Java Broker defects ###### + +#QPID-3591 Fails due to bytes credit issue +qpid_tests.broker_0_10.message.MessageTests.test_credit_flow_bytes +qpid_tests.broker_0_10.message.MessageTests.test_window_flow_bytes + +#QPID-3592 Fails to receive more messages after restart +qpid_tests.broker_0_10.message.MessageTests.test_window_stop + +#QPID-3539 Tests fail because is incorrectly being done per session and not connection +qpid_tests.broker_0_10.message.MessageTests.test_no_local +qpid_tests.broker_0_10.message.MessageTests.test_no_local_awkward +qpid_tests.broker_0_10.message.MessageTests.test_no_local_exclusive_subscribe + +#QPID-3593 Priority Queue test failures +qpid_tests.broker_0_10.priority.PriorityTests.test_browsing +qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_1 +qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_2 +qpid_tests.broker_0_10.priority.PriorityTests.test_requeue + +#QPID-3594 exclusive queues problem +qpid_tests.broker_0_10.queue.QueueTests.test_declare_exclusive + +#QPID-3477: Java broker does not handle rejection code specified in test qpid.tests.messaging.endpoints.SessionTests.testReject +#QPID-3595 Alternate Exchanges support requires work to be spec compliant. +qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_queue +qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_modify_existing_exchange_alternate +qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_autodelete +qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_delete +qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_delete_loop +qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_delete_no_match +qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_reject_no_match +qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_add_alternate_to_exchange + +#QPID-3596 Broker does not validate for reserved exchange names on create/bind. +qpid_tests.broker_0_10.exchange.DeclareMethodExchangeFieldReservedRuleTests.* + +#QPID-3597 Headers exchange issues +qpid_tests.broker_0_10.exchange.HeadersExchangeTests.* +qpid_tests.broker_0_10.queue.QueueTests.test_unbind_headers +qpid_tests.broker_0_10.exchange.RecommendedTypesRuleTests.testHeaders +qpid_tests.broker_0_10.exchange.RequiredInstancesRuleTests.testAmqMatch +qpid_tests.broker_0_10.query.QueryTests.test_exchange_bound_header + +#QPID-3598 Fanout exchange issues +qpid_tests.broker_0_10.query.QueryTests.test_exchange_bound_fanout + +#QPID-3599 Tests fail due to differences in expected message Redelivered status +qpid.tests.messaging.endpoints.SessionTests.testCommitAck +qpid.tests.messaging.endpoints.SessionTests.testRelease +qpid.tests.messaging.endpoints.SessionTests.testRollback -- cgit v1.2.1