summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-12-06 16:05:46 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-12-06 16:05:46 +0000
commit80ba5c5efdc23e3922b8f8f5152ceeaefa6951b6 (patch)
tree2eb2141eb77d43701e718b5b5ab1cbd07401015f
parent1d44d6e7a3369fb7773ba50d02c3baa8955da382 (diff)
downloadqpid-python-grkvlt-network-20101013.tar.gz
Attempt one at merge from r1021441:HEADgrkvlt-network-20101013
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1042697 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/AbstractConfiguration.java2
-rw-r--r--qpid/java/broker-plugins/experimental/info/build.properties18
-rw-r--r--qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF15
-rw-r--r--qpid/java/broker-plugins/experimental/shutdown/build.xml32
-rw-r--r--qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java71
-rw-r--r--qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java104
-rw-r--r--qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java47
-rwxr-xr-xqpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd (renamed from qpid/java/broker/bin/qpid.start)8
-rw-r--r--qpid/java/broker/build.xml6
-rw-r--r--qpid/java/broker/etc/config.xml7
-rw-r--r--qpid/java/broker/etc/log4j.xml6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java55
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java4
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java47
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java2
-rw-r--r--qpid/java/build.deps31
-rw-r--r--qpid/java/client/README.txt51
-rw-r--r--qpid/java/client/build.xml20
-rw-r--r--qpid/java/client/example/bin/README.txt11
-rw-r--r--qpid/java/client/example/bin/set_classpath.bat49
-rwxr-xr-xqpid/java/client/example/bin/set_classpath.sh82
-rw-r--r--qpid/java/client/example/source-jar.xml35
-rw-r--r--qpid/java/client/example/src/main/java/README.txt264
-rwxr-xr-xqpid/java/client/src/main/java/client.bnd29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java40
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java3
-rw-r--r--qpid/java/common.xml2
-rwxr-xr-xqpid/java/common/src/main/java/common.bnd21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java34
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java60
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java35
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java19
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java69
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java39
-rw-r--r--qpid/java/lib/slf4j-api-1.4.0.jarbin13095 -> 0 bytes
-rw-r--r--qpid/java/lib/slf4j-api-1.6.1.jarbin0 -> 25496 bytes
-rw-r--r--qpid/java/lib/slf4j-log4j12-1.4.0.jarbin7132 -> 0 bytes
-rw-r--r--qpid/java/lib/slf4j-log4j12-1.6.1.jarbin0 -> 9753 bytes
-rw-r--r--qpid/java/management/client/README.txt2
-rw-r--r--qpid/java/management/client/bin/qman-wsdm-start.cmd6
-rw-r--r--qpid/java/management/client/bin/qman-wsdm-start.sh4
-rw-r--r--qpid/java/management/client/build.xml10
-rw-r--r--qpid/java/management/common/src/main/java/management-common.bnd29
-rw-r--r--qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF2
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/resources/jmxremote.sasl-plugin/MANIFEST.MF1
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/Info.plist20
-rw-r--r--qpid/java/release-docs/RELEASE_NOTES.txt22
-rw-r--r--qpid/java/systests/etc/config-systests-settings.xml2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java50
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java9
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java19
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java11
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java45
-rw-r--r--qpid/java/test-profiles/08StandaloneExcludes23
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes19
-rw-r--r--qpid/java/test-profiles/CPPNoPrefetchExcludes19
-rw-r--r--qpid/java/test-profiles/CPPPrefetchExcludes19
-rw-r--r--qpid/java/test-profiles/CPPTransientExcludes19
-rw-r--r--qpid/java/test-profiles/Excludes21
-rwxr-xr-xqpid/java/test-profiles/Java010Excludes22
-rw-r--r--qpid/java/test-profiles/JavaExcludes19
-rw-r--r--qpid/java/test-profiles/JavaInVMExcludes19
-rw-r--r--qpid/java/test-profiles/JavaPersistentExcludes19
-rw-r--r--qpid/java/test-profiles/JavaStandaloneExcludes19
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes19
-rw-r--r--qpid/java/test-profiles/XAExcludes19
-rw-r--r--qpid/java/test-profiles/cpp.async.excludes19
-rw-r--r--qpid/java/test-profiles/cpp.async.testprofile18
-rw-r--r--qpid/java/test-profiles/cpp.cluster.testprofile18
-rw-r--r--qpid/java/test-profiles/cpp.excludes19
-rw-r--r--qpid/java/test-profiles/cpp.noprefetch.testprofile18
-rw-r--r--qpid/java/test-profiles/cpp.ssl.excludes19
-rw-r--r--qpid/java/test-profiles/cpp.ssl.testprofile18
-rw-r--r--qpid/java/test-profiles/cpp.testprofile18
-rw-r--r--qpid/java/test-profiles/default.0.10.testprofile2
-rw-r--r--qpid/java/test-profiles/default.testprofile21
-rw-r--r--qpid/java/test-profiles/java-derby.0.10.testprofile18
-rw-r--r--qpid/java/test-profiles/java-derby.testprofile18
-rw-r--r--qpid/java/test-profiles/java.0.10.testprofile18
-rw-r--r--qpid/java/test-profiles/java.testprofile18
-rw-r--r--qpid/java/test-profiles/test_resources/ssl/app1.crt21
-rw-r--r--qpid/java/test-profiles/test_resources/ssl/app1.req14
-rw-r--r--qpid/java/test-profiles/test_resources/ssl/app2.crt21
-rw-r--r--qpid/java/test-profiles/test_resources/ssl/app2.req14
-rw-r--r--qpid/java/test-profiles/test_resources/ssl/keystore.jksbin4226 -> 4296 bytes
-rwxr-xr-xqpid/java/tools/bin/qpid-python-testkit (renamed from qpid/java/testkit/bin/qpid-python-testkit)6
-rw-r--r--qpid/java/tools/bin/set-testkit-env.sh (renamed from qpid/java/testkit/bin/setenv.sh)10
-rwxr-xr-xqpid/java/tools/bin/testkit.py278
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java (renamed from qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java)0
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java (renamed from qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java)0
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java216
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java (renamed from qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java)0
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java370
123 files changed, 2453 insertions, 792 deletions
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/AbstractConfiguration.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/AbstractConfiguration.java
index a684e52ce4..e8e630842c 100644
--- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/AbstractConfiguration.java
+++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/AbstractConfiguration.java
@@ -61,7 +61,7 @@ public abstract class AbstractConfiguration implements ConfigurationFile
public RuleSet reload()
{
RuleSet oldRules = _config;
-
+
try
{
RuleSet newRules = load();
diff --git a/qpid/java/broker-plugins/experimental/info/build.properties b/qpid/java/broker-plugins/experimental/info/build.properties
index ca85cb7b66..bdbbe1c2af 100644
--- a/qpid/java/broker-plugins/experimental/info/build.properties
+++ b/qpid/java/broker-plugins/experimental/info/build.properties
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
source.. = src/
output.. = bin/
bin.includes = META-INF/,\
diff --git a/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF b/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF
new file mode 100644
index 0000000000..49e90c6aad
--- /dev/null
+++ b/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF
@@ -0,0 +1,15 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: Experimental Shutdown
+Bundle-Description: Experimental Qpid Broker Shutdown Plugin
+Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
+Bundle-DocURL: http://qpid.apache.org/
+Bundle-SymbolicName: broker-plugins-experimental-shutdown;singleton:=true
+Bundle-Version: 1.0.0
+Bundle-Activator: org.apache.qpid.shutdown.Activator
+Import-Package: javax.management;resolution:=optional,
+ org.apache.log4j,
+ org.osgi.framework
+Bundle-RequiredExecutionEnvironment: J2SE-1.5
+Bundle-ActivationPolicy: lazy
+
diff --git a/qpid/java/broker-plugins/experimental/shutdown/build.xml b/qpid/java/broker-plugins/experimental/shutdown/build.xml
new file mode 100644
index 0000000000..ec4fce374e
--- /dev/null
+++ b/qpid/java/broker-plugins/experimental/shutdown/build.xml
@@ -0,0 +1,32 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="AMQ Broker Shutdown Plugin" default="build">
+
+ <property name="module.depends" value="common broker broker-plugins"/>
+ <property name="module.test.depends" value="test broker/test management/common client systests"/>
+ <property name="module.manifest" value="MANIFEST.MF"/>
+ <property name="module.plugin" value="true"/>
+
+ <import file="../../../module.xml"/>
+
+ <target name="bundle" depends="bundle-tasks"/>
+
+</project>
diff --git a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java
new file mode 100644
index 0000000000..ad5e7707b6
--- /dev/null
+++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.shutdown;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.log4j.Logger;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+public class Activator implements BundleActivator
+{
+ private static final Logger _logger = Logger.getLogger(Activator.class);
+
+ private static final String SHUTDOWN_MBEAN_NAME = "org.apache.qpid:type=ShutdownMBean";
+
+ /** @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) */
+ public void start(BundleContext ctx) throws Exception {
+ Shutdown shutdown = new Shutdown();
+ if (ctx != null)
+ {
+ ctx.registerService(ShutdownMBean.class.getName(), shutdown, null);
+ }
+
+ // MBean registration
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName name = new ObjectName(SHUTDOWN_MBEAN_NAME);
+ mbs.registerMBean(shutdown, name);
+
+ _logger.info("Shutdown plugin MBean registered");
+ }
+
+ /** @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) */
+ public void stop(BundleContext ctx) throws Exception
+ {
+ // Unregister MBean
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName name = new ObjectName(SHUTDOWN_MBEAN_NAME);
+ try
+ {
+ mbs.unregisterMBean(name);
+ }
+ catch (InstanceNotFoundException e)
+ {
+ //ignore
+ }
+
+ _logger.info("Shutdown plugin MBean unregistered");
+ }
+}
diff --git a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java
new file mode 100644
index 0000000000..9a6f85fe9c
--- /dev/null
+++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.shutdown;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Implementation of the JMX broker shutdown plugin.
+ */
+public class Shutdown implements ShutdownMBean
+{
+ private static final Logger _logger = Logger.getLogger(Shutdown.class);
+
+ private static final String FORMAT = "yyyyy/MM/dd hh:mm:ss";
+ private static final int THREAD_COUNT = 1;
+ private static final ScheduledExecutorService EXECUTOR = new ScheduledThreadPoolExecutor(THREAD_COUNT);
+
+ private final Runnable _shutdown = new SystemExiter();
+
+ /** @see ShutdownMBean#shutdown() */
+ public void shutdown()
+ {
+ _logger.info("Shutting down at user's request");
+ shutdownBroker(0);
+ }
+
+ /** @see ShutdownMBean#shutdown(long) */
+ public void shutdown(long delay)
+ {
+ _logger.info("Scheduled broker shutdown after " + delay + "ms");
+ shutdownBroker(delay);
+ }
+
+ /** @see ShutdownMBean#shutdownAt(String) */
+ public void shutdownAt(String when)
+ {
+ Date date;
+ DateFormat df = new SimpleDateFormat(FORMAT);
+ try
+ {
+ date = df.parse(when);
+ }
+ catch (ParseException e)
+ {
+ _logger.error("Invalid date \"" + when + "\": expecting " + FORMAT, e);
+ return;
+ }
+ _logger.info("Scheduled broker shutdown at " + when);
+ long now = System.currentTimeMillis();
+ long time = date.getTime();
+ if (time > now)
+ {
+ shutdownBroker(time - now);
+ }
+ else
+ {
+ shutdownBroker(0);
+ }
+ }
+
+ /**
+ * Submits the {@link SystemExiter} job to shutdown the broker.
+ */
+ private void shutdownBroker(long delay)
+ {
+ EXECUTOR.schedule(_shutdown, delay, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Shutting down the system in another thread to avoid JMX exceptions being thrown.
+ */
+ class SystemExiter implements Runnable
+ {
+ public void run()
+ {
+ System.exit(0);
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java
new file mode 100644
index 0000000000..6294f869e9
--- /dev/null
+++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.shutdown;
+
+/**
+ * Shutdown plugin JMX MBean interface.
+ *
+ * Shuts the Qpid broker down via JMX.
+ */
+public interface ShutdownMBean
+{
+ /**
+ * Broker will be shut down immediately.
+ */
+ public void shutdown();
+
+ /**
+ * Broker will be shutdown after the specified delay
+ *
+ * @param delay the number of ms to wait
+ */
+ public void shutdown(long delay);
+
+ /**
+ * Broker will be shutdown at the specified date and time.
+ *
+ * @param when the date and time to shutdown
+ */
+ public void shutdownAt(String when);
+}
diff --git a/qpid/java/broker/bin/qpid.start b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd
index e44b6083ea..6e005f5bdb 100755
--- a/qpid/java/broker/bin/qpid.start
+++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd
@@ -1,4 +1,3 @@
-#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -18,4 +17,9 @@
# under the License.
#
-exec qpid-server -run:debug "$@" \ No newline at end of file
+ver: 0.9.0
+
+Bundle-SymbolicName: qpid-shutdown-plugin
+Bundle-Version: ${ver}
+Export-Package: *;version=${ver}
+Bundle-RequiredExecutionEnvironment: J2SE-1.5
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index f22972384d..edd71effaa 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -72,6 +72,12 @@
<fixcrlf srcdir="${module.release}/bin" fixlast="true" eol="dos" includes="*.bat"/>
</target>
+ <target name="release-bin-other" description="copy broker-plugins into module release">
+ <copy todir="${module.release}/lib/plugins" failonerror="true">
+ <fileset dir="${build.lib}/plugins"/>
+ </copy>
+ </target>
+
<target name="release-bin" depends="release-bin-tasks"/>
</project>
diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml
index 87101678c0..d9677c9cf6 100644
--- a/qpid/java/broker/etc/config.xml
+++ b/qpid/java/broker/etc/config.xml
@@ -53,9 +53,10 @@
<enabled>true</enabled>
<jmxport>8999</jmxport>
<ssl>
- <enabled>true</enabled>
- <!-- Update below path to your keystore location, eg ${conf}/qpid.keystore -->
- <keyStorePath>${prefix}/../test-profiles/test_resources/ssl/keystore.jks</keyStorePath>
+ <enabled>false</enabled>
+ <!-- Update below path to your keystore location, or run the bin/create-example-ssl-stores(.sh|.bat)
+ script from within the etc/ folder to generate an example store with self-signed cert -->
+ <keyStorePath>${conf}/qpid.keystore</keyStorePath>
<keyStorePassword>password</keyStorePassword>
</ssl>
</management>
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml
index 381173da15..967b0933b4 100644
--- a/qpid/java/broker/etc/log4j.xml
+++ b/qpid/java/broker/etc/log4j.xml
@@ -68,7 +68,7 @@
<param name="backupFilesToPath" value="${QPID_WORK}/backup/log"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
</layout>
</appender>
@@ -77,13 +77,13 @@
<param name="Append" value="false"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
</layout>
</appender>
<appender class="org.apache.log4j.ConsoleAppender" name="STDOUT">
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
</layout>
</appender>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java
index ad4e40a562..8150cd7404 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java
@@ -255,6 +255,7 @@ public class BrokerInstance
private void configureLogging(File logConfigFile, int logWatchTime) throws Exception
{
+ _logger.info("configuring logging using file " + logConfigFile.getName());
if (logConfigFile.exists() && logConfigFile.canRead())
{
CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
index 0e03e33be8..7dfe9ff49a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
@@ -29,6 +29,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import sun.misc.Unsafe;
+
public class ConfigStore
{
private ConcurrentHashMap<ConfigObjectType, ConcurrentHashMap<UUID, ConfiguredObject>> _typeMap =
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
index c4cad1e5c9..18f41588d5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
@@ -229,13 +229,12 @@ public abstract class ConfigurationPlugin
return getListValue(property, Collections.<String>emptyList());
}
- @SuppressWarnings("unchecked")
protected List<String> getListValue(String property, List<String> defaultValue)
{
- return (List<String>) _configuration.getList(property, defaultValue);
+ return _configuration.getList(property, defaultValue);
}
- /// Validation Helpers
+ // Validation Helpers
protected boolean contains(String property)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java
index c5fbb6efd9..7a2632d923 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java
@@ -85,8 +85,5 @@ public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin
}
}
- System.out.println("Configured SCDC");
- System.out.println("Delay:" + getDelay());
- System.out.println("TimeUnit:" + getTimeUnit());
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index c06305ee4e..caec2c1324 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -57,20 +57,21 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
connection.close(cause, message);
}
- catch (AMQException e)
+ catch (Exception e)
{
- _logger.warn("Error closing connection:" + e.getMessage());
+ _logger.warn("Error closing connection: " + e.getMessage());
+ deregisterConnection(connection);
}
}
- public void registerConnection(AMQConnectionModel connnection)
+ public void registerConnection(AMQConnectionModel connection)
{
- _registry.add(connnection);
+ _registry.add(connection);
}
- public void deregisterConnection(AMQConnectionModel connnection)
+ public void deregisterConnection(AMQConnectionModel connection)
{
- _registry.remove(connnection);
+ _registry.remove(connection);
}
@Override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java
index c4ffcd26bf..8bce180784 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java
@@ -20,9 +20,13 @@
*/
package org.apache.qpid.server.management;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.thread.Threading;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanInfo;
@@ -45,12 +49,13 @@ public abstract class AMQManagedObject extends DefaultManagedObject
/**
* broadcaster support class
*/
- protected NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport();
+ protected NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport(
+ Executors.newCachedThreadPool(Threading.getThreadFactory()));
/**
* sequence number for notifications
*/
- protected long _notificationSequenceNumber = 0;
+ protected AtomicLong _notificationSequenceNumber = new AtomicLong(0);
protected MBeanInfo _mbeanInfo;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java
index 055403ff08..399f8f9327 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.server.message;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
public abstract class MessageReference<M extends ServerMessage>
{
- private static final AtomicReferenceFieldUpdater<MessageReference, ServerMessage> _messageUpdater =
- AtomicReferenceFieldUpdater.newUpdater(MessageReference.class, ServerMessage.class,"_message");
+ private final AtomicBoolean _released = new AtomicBoolean(false);
private volatile M _message;
@@ -47,10 +46,12 @@ public abstract class MessageReference<M extends ServerMessage>
public void release()
{
- M message = (M) _messageUpdater.getAndSet(this,null);
- if(message != null)
+ if(!_released.getAndSet(true))
{
- onRelease(message);
+ if(_message != null)
+ {
+ onRelease(_message);
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
index b61da12b05..a6bab017a1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
@@ -63,6 +63,7 @@ public class PluginManager implements Closeable
private static final Logger _logger = Logger.getLogger(PluginManager.class);
private static final int FELIX_STOP_TIMEOUT = 30000;
+ private static final String QPID_VER_SUFFIX = "version=0.9,";
private Framework _felix;
@@ -133,33 +134,33 @@ public class PluginManager implements Closeable
"org.osgi.service.startlevel; version=1.0.0," +
"org.osgi.service.url; version=1.0.0," +
"org.osgi.util.tracker; version=1.0.0," +
- "org.apache.qpid.junit.extensions.util; version=0.7," +
- "org.apache.qpid; version=0.7," +
- "org.apache.qpid.common; version=0.7," +
- "org.apache.qpid.exchange; version=0.7," +
- "org.apache.qpid.framing; version=0.7," +
- "org.apache.qpid.management.common.mbeans.annotations; version=0.7," +
- "org.apache.qpid.protocol; version=0.7," +
- "org.apache.qpid.server.binding; version=0.7," +
- "org.apache.qpid.server.configuration; version=0.7," +
- "org.apache.qpid.server.configuration.plugins; version=0.7," +
- "org.apache.qpid.server.configuration.management; version=0.7," +
- "org.apache.qpid.server.exchange; version=0.7," +
- "org.apache.qpid.server.logging; version=0.7," +
- "org.apache.qpid.server.logging.actors; version=0.7," +
- "org.apache.qpid.server.logging.subjects; version=0.7," +
- "org.apache.qpid.server.management; version=0.7," +
- "org.apache.qpid.server.persistent; version=0.7," +
- "org.apache.qpid.server.plugins; version=0.7," +
- "org.apache.qpid.server.protocol; version=0.7," +
- "org.apache.qpid.server.queue; version=0.7," +
- "org.apache.qpid.server.registry; version=0.7," +
- "org.apache.qpid.server.security; version=0.7," +
- "org.apache.qpid.server.security.access; version=0.7," +
- "org.apache.qpid.server.security.access.plugins; version=0.7," +
- "org.apache.qpid.server.virtualhost; version=0.7," +
- "org.apache.qpid.server.virtualhost.plugins; version=0.7," +
- "org.apache.qpid.util; version=0.7," +
+ "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX +
+ "org.apache.qpid; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.common; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.exchange; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.framing; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.protocol; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.management; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.security; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.util; " + QPID_VER_SUFFIX +
"org.apache.commons.configuration; version=1.0.0," +
"org.apache.commons.lang; version=1.0.0," +
"org.apache.commons.lang.builder; version=1.0.0," +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 1185557d8f..c339bd9f90 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -361,7 +361,6 @@ public class AMQProtocolEngine implements Receiver<java.nio.ByteBuffer>, Managab
mechanisms.getBytes(),
locales.getBytes());
_sender.send(responseBody.generateFrame(0).toNioByteBuffer());
-
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 77101e7d58..b009b6f522 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -135,7 +135,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public String getVersion()
{
- return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString();
+ return _protocolSession.getClientVersion();
}
public Date getLastIoTime()
@@ -324,7 +324,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public void notifyClients(String notificationMsg)
{
Notification n =
- new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, _notificationSequenceNumber.getAndIncrement(),
System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java
index 2fdf27d1aa..3a5bc7de48 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java
@@ -54,7 +54,7 @@ public class BrokerReceiver implements Receiver<java.nio.ByteBuffer>, LogSubject
private IApplicationRegistry _appRegistry;
private volatile Receiver<java.nio.ByteBuffer> _delegate = new SelfDelegateProtocolEngine();
-
+
public BrokerReceiver(IApplicationRegistry appRegistry,
String fqdn,
Set<VERSION> supported,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index 92b0236b6c..3befd43d89 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -38,7 +38,7 @@ import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_0_10 extends InputHandler implements ConnectionConfig
{
- public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+ public static final int MAX_FRAME_SIZE = Integer.getInteger("qpid.maxFrameSize", 64 * 1024 - 1);
private NetworkConnection _network;
private ServerConnection _connection;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index b5294b6d2f..784582b83e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -57,7 +57,44 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.Notification;
+import javax.management.OperationsException;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.MessageProperties;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
@@ -298,7 +335,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
*/
public void checkForNotification(ServerMessage msg) throws AMQException
{
-
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
if(!notificationChecks.isEmpty())
@@ -317,7 +353,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
}
}
-
}
/**
@@ -330,7 +365,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
notificationMsg = notification.name() + " " + notificationMsg;
_lastNotification =
- new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, _notificationSequenceNumber.getAndIncrement(),
System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(_lastNotification);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index f1407b8770..580fe8e834 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -55,6 +55,7 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkTransport;
/**
@@ -71,7 +72,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
protected final ServerConfiguration _configuration;
- protected final Map<Integer, NetworkTransport> _transports = new HashMap<Integer, NetworkTransport>();
+ protected final Map<Integer, IncomingNetworkTransport> _transports = new HashMap<Integer, IncomingNetworkTransport>();
protected ManagedObjectRegistry _managedObjectRegistry;
@@ -374,12 +375,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
transport.close();
+ CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(transport.getAddress().toString(), port));
}
catch (Throwable e)
{
_logger.error("Unable to close network driver due to:" + e.getMessage());
}
- CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(transport.getAddress().toString(), port));
}
}
}
@@ -389,7 +390,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return _configuration;
}
- public void registerTransport(int port, NetworkTransport transport)
+ public void registerTransport(int port, IncomingNetworkTransport transport)
{
synchronized (_transports)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index 9d138055bf..3357a42e68 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkTransport;
public interface IApplicationRegistry
@@ -81,7 +82,7 @@ public interface IApplicationRegistry
/**
* Register any network transports for this registry
*/
- void registerTransport(int port, NetworkTransport transport);
+ void registerTransport(int port, IncomingNetworkTransport transport);
public UUID getBrokerId();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 0865165925..2e694b24ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -134,6 +134,8 @@ public class DerbyMessageStore implements MessageStore
private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+ private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
+
private LogSubject _logSubject;
private boolean _configured;
@@ -631,9 +633,9 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- if (e.getSQLState().equalsIgnoreCase("XJ015"))
+ if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE))
{
- //XJ015 is expected and represents a clean shutdown, do nothing.
+ //expected and represents a clean shutdown of this database only, do nothing.
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 9952700ae1..511d8e7fed 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -906,10 +906,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public String toLogString()
{
String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
- _queue.getNameShortString());
- String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
- // queueString is "vh(/{0})/qu({1}) " so need to trim
- + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] ";
+ _queue.getNameShortString());
+ String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) +
+ "(" + queueInfo.trim() + ")" + "] ";
return result;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index d6abee45d8..2439e607b1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -91,6 +91,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
if (state == State.CLOSED)
{
CurrentActor.get().message(this, ConnectionMessages.CLOSE());
+ if (_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
index 2db1944cd1..9ba9e2f4a4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
@@ -45,13 +45,13 @@ public abstract class HouseKeepingTask implements Runnable
{
// Don't need to undo this as this is a thread pool thread so will
// always go through here before we do any real work.
- Thread.currentThread().setName(_name);
+ //Thread.currentThread().setName(_name); // XXX temporary
CurrentActor.set(new AbstractActor(_rootLogger)
{
@Override
public String getLogMessage()
{
- return _name;
+ return _name + " ";
}
});
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index c54173a281..1038e8fbd0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
@@ -215,6 +216,25 @@ public class VirtualHostImpl implements VirtualHost
_connectionRegistry = new ConnectionRegistry();
_houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount());
+ _houseKeepingTasks.setThreadFactory(new ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(r);
+ String name = "HouseKeeping";
+ StackTraceElement[] trace = Thread.currentThread().getStackTrace();
+ for (StackTraceElement elt : trace)
+ {
+ if (elt.getClassName().endsWith("Test"))
+ {
+ name += "-" + elt.getClassName();
+// break; // FIXME
+ }
+ }
+ t.setName(name);
+ return t;
+ }
+ });
_queueRegistry = new DefaultQueueRegistry(this);
@@ -248,6 +268,7 @@ public class VirtualHostImpl implements VirtualHost
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
+
initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
}
@@ -275,12 +296,22 @@ public class VirtualHostImpl implements VirtualHost
}
catch (Exception e)
{
- _logger.error("Exception in housekeeping for queue: "
- + q.getNameShortString().toString(), e);
- //Don't throw exceptions as this will stop the
- // house keeping task from running.
+ _logger.error("Exception in housekeeping for queue: " + q.getName(), e);
+ // Don't throw exceptions as this will stop the task from running.
}
}
+ }
+ }
+
+ class CheckTransactionsTask extends HouseKeepingTask
+ {
+ public CheckTransactionsTask(VirtualHost vhost)
+ {
+ super(vhost);
+ }
+
+ public void execute()
+ {
for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
{
_logger.debug("Checking for long running open transactions on connection " + connection);
@@ -293,17 +324,19 @@ public class VirtualHostImpl implements VirtualHost
_configuration.getTransactionTimeoutOpenClose(),
_configuration.getTransactionTimeoutIdleWarn(),
_configuration.getTransactionTimeoutIdleClose());
- }
+ }
catch (Exception e)
{
_logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+ // Don't throw exceptions as this will stop the task from running.
}
}
}
}
- }
+ };
scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
+ scheduleHouseKeepingTask(period, new CheckTransactionsTask(this));
Map<String, VirtualHostPluginFactory> plugins =
ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
@@ -368,13 +401,11 @@ public class VirtualHostImpl implements VirtualHost
_houseKeepingTasks.setCorePoolSize(newSize);
}
-
public int getHouseKeepingActiveCount()
{
return _houseKeepingTasks.getActiveCount();
}
-
private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
{
String messageStoreClass = hostConfig.getMessageStoreClass();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
index 12206013eb..3346f80b7c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
@@ -83,9 +83,11 @@ public class ConfiguredQueueBindingListener implements BindingListener
if (config != null)
{
_cache.add(queue);
+ _log.error("=== SCD === ADD " + queue.getName());
}
else
{
+ _log.error("=== SCD === REMOVE " + queue.getName());
_cache.remove(queue);
}
}
diff --git a/qpid/java/build.deps b/qpid/java/build.deps
index 815ff35058..b56c79f588 100644
--- a/qpid/java/build.deps
+++ b/qpid/java/build.deps
@@ -1,3 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+backport-util-concurrent=lib/backport-util-concurrent-2.2.jar
+
commons-beanutils-core=lib/commons-beanutils-core-1.8.0.jar
commons-cli=lib/commons-cli-1.0.jar
commons-codec=lib/commons-codec-1.3.jar
@@ -22,8 +43,8 @@ log4j=lib/log4j-1.2.12.jar
mina-core=lib/mina-core-1.1.7.jar
mina-filter-ssl=lib/mina-filter-ssl-1.1.7.jar
-slf4j-api=lib/slf4j-api-1.4.0.jar
-slf4j-log4j=lib/slf4j-log4j12-1.4.0.jar
+slf4j-api=lib/slf4j-api-1.6.1.jar
+slf4j-log4j=lib/slf4j-log4j12-1.6.1.jar
xalan=lib/xalan-2.7.0.jar
@@ -75,9 +96,15 @@ felix.libs=${felix-framework}
commons-configuration.libs = ${commons-beanutils-core} ${commons-digester} \
${commons-codec} ${commons-lang} ${commons-collections} ${commons-configuration}
+<<<<<<< .working
common.libs=${slf4j-api} ${mina-core} ${mina-filter-ssl}
client.libs=${geronimo-jms} ${common.libs}
tools.libs=${commons-configuration.libs} ${broker.libs}
+=======
+common.libs=${slf4j-api} ${backport-util-concurrent} ${mina-core} ${mina-filter-ssl}
+client.libs=${geronimo-jms}
+tools.libs=${commons-configuration.libs} ${log4j}
+>>>>>>> .merge-right.r1042616
broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \
${xalan} ${felix.libs} ${derby-db} ${commons-configuration.libs}
diff --git a/qpid/java/client/README.txt b/qpid/java/client/README.txt
new file mode 100644
index 0000000000..57a98cc978
--- /dev/null
+++ b/qpid/java/client/README.txt
@@ -0,0 +1,51 @@
+Documentation
+=============
+
+You can access documentation for the client via our website at:
+http://qpid.apache.org/documentation
+
+and via our wiki at:
+http://cwiki.apache.org/confluence/display/qpid/Qpid+Java+Documentation
+
+The client uses the Java Message Service (JMS) 1.1 API, information on which is
+widely available using your favoured search engine.
+
+
+Running the Examples:
+=====================
+
+1. From the client Binary distribution:
+
+From the <installation path>/qpid-client-<version> directory, there are examples
+provided in source form in the example/src sub-directory. These are also
+provided in binary form in the example/lib directory in order that they can be
+run more easily.
+
+E.g, in order to run the Hello example, you would add the client+example library
+files to the java classpath and launch the example like follows:
+
+java -cp "lib/qpid-all.jar:example/lib/qpid-client-examples-<version>.jar" \
+ org.apache.qpid.example.Hello
+
+NOTE: The client uses the SL4FJ API for its logging. You must supply a logging
+implementation of your choice (eg Log4J) and its associated SLF4J binding, by
+also adding them to the Java classpath as well as the client libraries
+themselves. Failure to do so will result in a warning being output and use of
+NoOp logging by the client.
+
+More information on using SLF4J is available at http://www.slf4j.org/manual.html
+which details some of the supported logging implementations and their
+associated SLF4 bindings as available in the SLF4J distribution.
+
+
+
+2. From the Source distribution / repository:
+
+Run 'ant build' in the parent directory from where this file is stored, ie:
+<installation path>/qpid/java
+
+This will build the various Java modules, leaving binary .jar files output in:
+<installation path>/qpid/java/build/lib
+
+Taking the above the 'distribution directory', consult the README.txt file at:
+<installation path>/qpid/java/client/example/src/main/java
diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml
index 3c6132dc5b..d52de8dca6 100644
--- a/qpid/java/client/build.xml
+++ b/qpid/java/client/build.xml
@@ -27,6 +27,9 @@
<import file="../module.xml"/>
+ <property name="example.src.dir" value="${project.root}/client/example/src/main/java" />
+ <property name="example.jar.file" value="${build.lib}/qpid-client-example-${project.version}.jar" />
+
<property name="output.dir" value="${module.precompiled}/org/apache/qpid/filter/selector"/>
<target name="precompile">
@@ -46,6 +49,23 @@
classpathref="module.class.path" packagenames="org.apache.qpid.jms"/>
</target>
+ <target name="release-bin-copy-examples">
+ <copy todir="${module.release}/example/src" failonerror="true">
+ <fileset dir="${example.src.dir}" excludes="runSample.sh README.txt" />
+ </copy>
+ <copy todir="${module.release}/example/lib" failonerror="true">
+ <fileset file="${example.jar.file}"/>
+ </copy>
+ </target>
+
+ <target name="release-bin-copy-readme">
+ <copy todir="${module.release}" overwrite="true" failonerror="true">
+ <fileset file="${basedir}/README.txt" />
+ </copy>
+ </target>
+
+ <target name="release-bin-other" depends="release-bin-copy-examples,release-bin-copy-readme"/>
+
<target name="release-bin" depends="release-bin-tasks"/>
<target name="bundle" depends="bundle-tasks"/>
diff --git a/qpid/java/client/example/bin/README.txt b/qpid/java/client/example/bin/README.txt
deleted file mode 100644
index 9a1ce91d41..0000000000
--- a/qpid/java/client/example/bin/README.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-= Qpid Java Examples =
-
-For more information read ../README.txt.
-
-== The Verify All Script ==
-
-The verify_all script will run Java examples against itself and against the C++
-and Python examples. The success of the script is determined by comparing its
-output against what is expected.
-
-This script uses the verify script found in qpid/cpp/examples.
diff --git a/qpid/java/client/example/bin/set_classpath.bat b/qpid/java/client/example/bin/set_classpath.bat
deleted file mode 100644
index 862e8e467a..0000000000
--- a/qpid/java/client/example/bin/set_classpath.bat
+++ /dev/null
@@ -1,49 +0,0 @@
-@REM Licensed to the Apache Software Foundation (ASF) under one
-@REM or more contributor license agreements. See the NOTICE file
-@REM distributed with this work for additional information
-@REM regarding copyright ownership. The ASF licenses this file
-@REM to you under the Apache License, Version 2.0 (the
-@REM "License"); you may not use this file except in compliance
-@REM with the License. You may obtain a copy of the License at
-@REM
-@REM http://www.apache.org/licenses/LICENSE-2.0
-@REM
-@REM Unless required by applicable law or agreed to in writing,
-@REM software distributed under the License is distributed on an
-@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-@REM KIND, either express or implied. See the License for the
-@REM specific language governing permissions and limitations
-@REM under the License.
-
-@REM Helper script to set classpath for running Qpid example classes
-@REM NB: You must add the Qpid client and common jars to your CLASSPATH
-@REM before running this script
-
-@echo off
-
-if "%QPID_HOME%" == "" GOTO ERROR_QPID_HOME
-
-set QPIDLIB=%QPID_HOME%\lib
-
-if "%CLASSPATH%" == "" GOTO ERROR_CLASSPATH
-
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\geronimo-jms_1.1_spec-1.0.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-collections-3.1.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-configuration-1.2.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-cli-1.0.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-lang-2.1.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-logging-api-1.0.4.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-logging-1.0.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\log4j-1.2.12.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\mina-core-1.1.7.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\mina-filter-ssl-1.1.7.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\mina-java5-1.0.0.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\slf4j-simple-1.0.jar
-
-GOTO END
-
-:ERROR_CLASSPATH
-Echo Please set set your CLASSPATH variable to include the Qpid client and common jars. Exiting ....
-:ERROR_QPID_HOME
-Echo Please set QPID_HOME variable. Exiting ....
-:END
diff --git a/qpid/java/client/example/bin/set_classpath.sh b/qpid/java/client/example/bin/set_classpath.sh
deleted file mode 100755
index a4f1b93625..0000000000
--- a/qpid/java/client/example/bin/set_classpath.sh
+++ /dev/null
@@ -1,82 +0,0 @@
-#!/bin/sh -xv
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Helper script to set classpath for running Qpid example classes
-# NB: You must add the Qpid client and common jars to your CLASSPATH
-# before running this script
-
-
-cygwin=false
-if [[ "$(uname -a | fgrep Cygwin)" != "" ]]; then
- cygwin=true
-fi
-
-#Should have set the QPID_HOME var after install to the working dir e.g. home/qpid/qpid-1.0-incubating-M2-SNAPSHOT
-if [ "$QPID_HOME" = "" ] ; then
- echo "ERROR: Please set QPID_HOME variable. Exiting ...."
- exit 1
-else
- QPIDLIB=$QPID_HOME/lib
-fi
-
-if $cygwin; then
- QPIDLIB=$(cygpath -w $QPIDLIB)
-fi
-
-if [ "$CLASSPATH" = "" ] ; then
- echo "ERROR: Please set set your CLASSPATH variable to include the Qpid client and common jars. Exiting ...."
- exit 2
-fi
-
-#Converts paths for cygwin if req
-#Some nasty concatenation to get round cygpath line limits
-if $cygwin; then
- SEP=";"
- CLASSPATH=`cygpath -w $CLASSPATH`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/geronimo-jms_1.1_spec-1.0.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-collections-3.1.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-configuration-1.2.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-cli-1.0.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-lang-2.1.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-logging-api-1.0.4.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-logging-1.0.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/log4j-1.2.12.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/mina-core-1.1.7.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/mina-filter-ssl-1.1.7.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/mina-java5-1.0.0.jar`
- CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/slf4j-simple-1.0.jar`
- export CLASSPATH
-else
- CLASSPATH=$CLASSPATH:$QPIDLIB/backport-util-concurrent-2.2.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/geronimo-jms_1.1_spec-1.0.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/commons-collections-3.1.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/commons-configuration-1.2.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/commons-cli-1.0.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/commons-lang-2.1.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/commons-logging-api-1.0.4.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/commons-logging-1.0.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/log4j-1.2.12.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/mina-core-1.0.0.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/mina-filter-ssl-1.0.0.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/mina-java5-1.0.0.jar
- CLASSPATH=$CLASSPATH:$QPIDLIB/slf4j-simple-1.0.jar
- export CLASSPATH
-fi
-
diff --git a/qpid/java/client/example/source-jar.xml b/qpid/java/client/example/source-jar.xml
deleted file mode 100644
index 60451448b8..0000000000
--- a/qpid/java/client/example/source-jar.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<!-- This is an assembly descriptor that produces a jar file that contains all the
- dependencies, fully expanded into a single jar, required to run the tests of
- a maven project.
- -->
-<assembly>
- <id>source</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>src/main/java</directory>
- <outputDirectory></outputDirectory>
- </fileSet>
- </fileSets>
-</assembly>
diff --git a/qpid/java/client/example/src/main/java/README.txt b/qpid/java/client/example/src/main/java/README.txt
index 7b371f17b5..757054e492 100644
--- a/qpid/java/client/example/src/main/java/README.txt
+++ b/qpid/java/client/example/src/main/java/README.txt
@@ -23,267 +23,11 @@ Note: you must have write privileges to this directory in order to run
the examples.
-Running the Direct Examples
+Running the Examples
===========================
To run these programs, do the following:
- 1. Make sure that a qpidd broker is running:
-
- $ ps -eaf | grep qpidd
-
- If a broker is running, you should see the qpidd process in the
- output of the above command.
-
- 2. In the java directory, use runSample.sh to run the Consumer
- program:
-
- $ ./runSample.sh org.apache.qpid.example.jmsexample.direct.Consumer
- Using QPID_HOME: /usr/share/java/
- Using QPID_SAMPLE: /usr/share/doc/rhm-0.3
- Consumer: Setting an ExceptionListener on the connection as sample uses a MessageConsumer
- Consumer: Creating a non-transacted, auto-acknowledged session
- Consumer: Creating a MessageConsumer
- Consumer: Starting connection so MessageConsumer can receive messages
-
- 3. In a separate window, use runSample.sh to run the Producer
- program:
-
- $ ./runSample.sh org.apache.qpid.example.jmsexample.direct.Producer
- Using QPID_HOME: /usr/share/java/
- Using QPID_SAMPLE: /usr/share/doc/rhm-0.3
- Producer: Creating a non-transacted, auto-acknowledged session
- Producer: Creating a Message Producer
- Producer: Creating a TestMessage to send to the destination
- Producer: Sending message: 1
- Producer: Sending message: 2
- Producer: Sending message: 3
- Producer: Sending message: 4
- Producer: Sending message: 5
- Producer: Sending message: 6
- Producer: Sending message: 7
- Producer: Sending message: 8
- Producer: Sending message: 9
- Producer: Sending message: 10
- Producer: Closing connection
- Producer: Closing JNDI context
-
- 4. Now go back to the window where the Consumer program is
- running. You should see the following output:
-
- Consumer: Received message: Message 1
- Consumer: Received message: Message 2
- Consumer: Received message: Message 3
- Consumer: Received message: Message 4
- Consumer: Received message: Message 5
- Consumer: Received message: Message 6
- Consumer: Received message: Message 7
- Consumer: Received message: Message 8
- Consumer: Received message: Message 9
- Consumer: Received message: Message 10
- Consumer: Received final message That's all, folks!
- Consumer: Closing connection
- Consumer: Closing JNDI context
-
-
-
-Running the Fanout Examples
-===========================
-
-To run these programs, do the following:
-
- 1. Make sure that a qpidd broker is running:
-
- $ ps -eaf | grep qpidd
-
- If a broker is running, you should see the qpidd process in the
- output of the above command.
-
- 2. In the java directory, use runSample.sh to run the Consumer or
- Listener program, specifying a unique queue name, which must be
- “fanoutQueue1”, “fanoutQueue2”, or “fanoutQueue3”:
-
- $ ./runSample.sh org.apache.qpid.example.jmsexample.fanout.Consumer fanoutQueue1
- Using QPID_HOME: /usr/share/java/
- Using QPID_SAMPLE: /usr/share/doc/rhm-0.3
- Consumer: Setting an ExceptionListener on the connection as sample uses a MessageConsumer
- Consumer: Creating a non-transacted, auto-acknowledged session
- Consumer: Creating a MessageConsumer
- Consumer: Starting connection so MessageConsumer can receive messages
-
- You can do this in up to three windows, specifying a different
- name for each queue.
-
- 3. In a separate window, use runSample.sh to run the Producer
- program:
-
- $ ./runSample.sh org.apache.qpid.example.jmsexample.fanout.Producer
- Using QPID_HOME: /usr/share/java/
- Using QPID_SAMPLE: /usr/share/doc/rhm-0.3
- Producer: Creating a non-transacted, auto-acknowledged session
- Producer: Creating a Message Producer
- Producer: Creating a TestMessage to send to the destination
- Producer: Sending message: 1
- Producer: Sending message: 2
- Producer: Sending message: 3
- Producer: Sending message: 4
- Producer: Sending message: 5
- Producer: Sending message: 6
- Producer: Sending message: 7
- Producer: Sending message: 8
- Producer: Sending message: 9
- Producer: Sending message: 10
- Producer: Closing connection
- Producer: Closing JNDI context
-
- 4. Now go back to the window where the Listener program is
- running. You should see output like this:
-
- Consumer: Received message: Message 1
- Consumer: Received message: Message 2
- Consumer: Received message: Message 3
- Consumer: Received message: Message 4
- Consumer: Received message: Message 5
- Consumer: Received message: Message 6
- Consumer: Received message: Message 7
- Consumer: Received message: Message 8
- Consumer: Received message: Message 9
- Consumer: Received message: Message 10
- Consumer: Received final message That's all, folks!
- Consumer: Closing connection
- Consumer: Closing JNDI context
-
-
-Running the Publish/Subscribe Examples
-======================================
-
-To run these programs, do the following:
-
- 1. Make sure that a qpidd broker is running:
-
- $ ps -eaf | grep qpidd
-
- If a broker is running, you should see the qpidd process in the
- output of the above command.
-
- 2. In the java directory, use runSample.sh to run the Listener
- program:
-
- $ ./runSample.sh org.apache.qpid.example.jmsexample.pubsub.Listener
- Using QPID_HOME: /usr/share/java/
- Using QPID_SAMPLE: /usr/share/doc/rhm-0.3
- Listener: Setting an ExceptionListener on the connection as sample uses a TopicSubscriber
- Listener: Creating a non-transacted, auto-acknowledged session
- Listener: Creating a Message Subscriber for topic usa
- Listener: Creating a Message Subscriber for topic europe
- Listener: Creating a Message Subscriber for topic news
- Listener: Creating a Message Subscriber for topic weather
- Listener: Starting connection so TopicSubscriber can receive messages
-
- 3. In a separate window, use runSample.sh to run the Publisher
- program:
-
- $ ./runSample.sh org.apache.qpid.example.jmsexample.pubsub.Publisher
- Using QPID_HOME: /usr/share/java/
- Using QPID_SAMPLE: /usr/share/doc/rhm-0.3
- Publisher: Creating a non-transacted, auto-acknowledged session
- Publisher: Creating a TestMessage to send to the topics
- Publisher: Creating a Message Publisher for topic usa.weather
- Publisher: Sending message 1
- Publisher: Sending message 2
- Publisher: Sending message 3
- Publisher: Sending message 4
- Publisher: Sending message 5
- Publisher: Sending message 6
- Publisher: Creating a Message Publisher for topic usa.news
- Publisher: Sending message 1
- Publisher: Sending message 2
- Publisher: Sending message 3
- Publisher: Sending message 4
- Publisher: Sending message 5
- Publisher: Sending message 6
- Publisher: Creating a Message Publisher for topic europe.weather
- Publisher: Sending message 1
- Publisher: Sending message 2
- Publisher: Sending message 3
- Publisher: Sending message 4
- Publisher: Sending message 5
- Publisher: Sending message 6
- Publisher: Creating a Message Publisher for topic europe.news
- Publisher: Sending message 1
- Publisher: Sending message 2
- Publisher: Sending message 3
- Publisher: Sending message 4
- Publisher: Sending message 5
- Publisher: Sending message 6
- Publisher: Closing connection
- Publisher: Closing JNDI context
-
- 4. Now go back to the window where the Listener program is
- running. You should see output like this:
-
- Listener: Received message for topic: usa: message 1
- Listener: Received message for topic: weather: message 1
- Listener: Received message for topic: usa: message 2
- Listener: Received message for topic: weather: message 2
- Listener: Received message for topic: usa: message 3
- Listener: Received message for topic: weather: message 3
- Listener: Received message for topic: usa: message 4
- Listener: Received message for topic: weather: message 4
- Listener: Received message for topic: usa: message 5
- Listener: Received message for topic: weather: message 5
- Listener: Received message for topic: usa: message 6
- Listener: Received message for topic: weather: message 6
- . . .
- Listener: Shutting down listener for news
- Listener: Shutting down listener for weather
- Listener: Shutting down listener for usa
- Listener: Shutting down listener for europe
- Listener: Closing connection
- Listener: Closing JNDI context
-
-
-Running the Request/Response Examples
-=====================================
-
-To run these programs, do the following:
-
- 1. Make sure that a qpidd broker is running:
-
- $ ps -eaf | grep qpidd
-
- If a broker is running, you should see the qpidd process in the output of the above command.
-
- 2. In the java directory, use runSample.sh to run the Server
- program:
-
- $ ./runSample.sh org.apache.qpid.example.jmsexample.requestResponse.Server
- Using QPID_HOME: /usr/share/java/
- Using QPID_SAMPLE: /usr/share/doc/rhm-0.3
- Server: Setting an ExceptionListener on the connection as sample uses a MessageConsumer
- Server: Creating a non-transacted, auto-acknowledged session
- Server: Creating a MessageConsumer
- Server: Creating a MessageProducer
- Server: Starting connection so MessageConsumer can receive messages
-
- 3. In a separate window, use runSample.sh to run the Client
- program:
-
- $ ./runSample.sh org.apache.qpid.example.jmsexample.requestResponse.Client
- Using QPID_HOME: /usr/share/java/
- Using QPID_SAMPLE: /usr/share/doc/rhm-0.3
- Client: Setting an ExceptionListener on the connection as sample uses a MessageConsumer
- Client: Creating a non-transacted, auto-acknowledged session
- Client: Creating a QueueRequestor
- Client: Starting connection
- Client: Request Content= Twas brillig, and the slithy toves
- Client: Response Content= TWAS BRILLIG, AND THE SLITHY TOVES
- Client: Request Content= Did gire and gymble in the wabe.
- Client: Response Content= DID GIRE AND GYMBLE IN THE WABE.
- Client: Request Content= All mimsy were the borogroves,
- Client: Response Content= ALL MIMSY WERE THE BOROGROVES,
- Client: Request Content= And the mome raths outgrabe.
- Client: Response Content= AND THE MOME RATHS OUTGRABE.
- Client: Closing connection
- Client: Closing JNDI context
-
+ 1. Make sure that a Qpid broker is running.
+ 2. In the java directory, use runSample.sh to run the program:
+ $ ./runSample.sh <class name> <arguments> \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/client.bnd b/qpid/java/client/src/main/java/client.bnd
index 8f0f936583..0ddd163d4f 100755
--- a/qpid/java/client/src/main/java/client.bnd
+++ b/qpid/java/client/src/main/java/client.bnd
@@ -1,7 +1,26 @@
-ver: 0.7.0
-
-Bundle-SymbolicName: qpid-client
-Bundle-Version: ${ver}
-Export-Package: *;version=${ver}
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+ver: 0.9.0
+
+Bundle-SymbolicName: qpid-client
+Bundle-Version: ${ver}
+Export-Package: *;version=${ver}
Bundle-RequiredExecutionEnvironment: J2SE-1.5
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index dbd742070e..ee3e0767d4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1452,16 +1452,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Not a hard-error connection not closing: " + cause);
}
-
- // deliver the exception if there is a listener
- if (_exceptionListener != null)
- {
- _exceptionListener.onException(je);
- }
- else
- {
- _logger.error("Throwable Received but no listener set: " + cause);
- }
// if we are closing the connection, close sessions first
if (closer)
@@ -1475,6 +1465,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.error("Error closing all sessions: " + e, e);
}
}
+
+ // deliver the exception if there is a listener
+ if (_exceptionListener != null)
+ {
+ _exceptionListener.onException(je);
+ }
+ else
+ {
+ _logger.error("Throwable Received but no listener set: " + cause);
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 24e5253cc8..75f71a99c0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
boolean isTopic;
-
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
{
isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
preAcquire = !consumer.isNoConsume() &&
(isTopic || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
+
+ arguments.putAll(
+ (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
}
- Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 905bf5e111..4bac54b3e4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -107,7 +107,7 @@ public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSes
/**
* We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
*/
- private final boolean _exclusive;
+ protected boolean _exclusive;
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
@@ -182,7 +182,7 @@ public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSes
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
-
+
_synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
_noConsume = noConsume;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index d0f1f79631..699b52a6b1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -490,4 +490,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
clearReceiveQueue();
}
}
+
+ public boolean isExclusive()
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ return true;
+ }
+ else
+ {
+ return dest.getLink().getSubscription().isExclusive();
+ }
+ }
+ else
+ {
+ return _exclusive;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index cae11e3962..32c7ef29de 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -223,8 +223,10 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
String exchange = replyTo.getExchange();
String routingKey = replyTo.getRoutingKey();
- dest = generateDestination(exchange == null ? null : new AMQShortString(exchange),
- routingKey == null ? null : new AMQShortString(routingKey));
+ dest = generateDestination(exchange == null ? new AMQShortString("") :
+ new AMQShortString(exchange),
+ routingKey == null ? new AMQShortString(""):
+ new AMQShortString(routingKey));
_destinationCache.put(replyTo, new SoftReference<Destination>(dest));
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 64d5b16db0..00503cc650 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Link.Subscription;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.messaging.address.Node.UnknownNodeType;
@@ -264,6 +265,7 @@ public class AddressHelper
public Link getLink()
{
Link link = new Link();
+ link.setSubscription(new Subscription());
if (linkProps != null)
{
link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
@@ -283,7 +285,8 @@ public class AddressHelper
.setProducerCapacity(capacityProps
.getInt(CAPACITY_TARGET) == null ? 0
: capacityProps.getInt(CAPACITY_TARGET));
- } else
+ }
+ else
{
int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
.getInt(CAPACITY);
@@ -292,6 +295,21 @@ public class AddressHelper
}
link.setFilter(linkProps.getString(FILTER));
// so far filter type not used
+
+ if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+ {
+ Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+
+ if (x_subscribe.containsKey(ARGUMENTS))
+ {
+ link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+ }
+
+ boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+ Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
+
+ link.getSubscription().setExclusive(exclusive);
+ }
}
return link;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 0ebcaf548b..a7d19d1bd5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.qpid.client.messaging.address.Node.QueueNode;
public class Link
@@ -34,6 +37,7 @@ public class Link
protected int _consumerCapacity = 0;
protected int _producerCapacity = 0;
protected Node node;
+ protected Subscription subscription;
public Node getNode()
{
@@ -114,4 +118,40 @@ public class Link
{
this.name = name;
}
+
+ public Subscription getSubscription()
+ {
+ return this.subscription;
+ }
+
+ public void setSubscription(Subscription subscription)
+ {
+ this.subscription = subscription;
+ }
+
+ public static class Subscription
+ {
+ private Map<String,Object> args = new HashMap<String,Object>();
+ private boolean exclusive = false;
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+
+ public void setArgs(Map<String, Object> args)
+ {
+ this.args = args;
+ }
+
+ public boolean isExclusive()
+ {
+ return exclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ this.exclusive = exclusive;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index edfb4bb16b..10250a1ac0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -66,7 +66,6 @@ import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.NetworkTransport;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.network.Transport;
import org.slf4j.Logger;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 4236f20301..44376331ee 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -112,8 +112,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_logger.info("Using ProtocolVersion for Session:" + _protocolVersion);
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this);
_connection = connection;
}
diff --git a/qpid/java/common.xml b/qpid/java/common.xml
index 3ebf07a210..b1f28dc062 100644
--- a/qpid/java/common.xml
+++ b/qpid/java/common.xml
@@ -23,7 +23,7 @@
<dirname property="project.root" file="${ant.file.common}"/>
<property name="project.name" value="qpid"/>
- <property name="project.version" value="0.7"/>
+ <property name="project.version" value="0.9"/>
<property name="project.namever" value="${project.name}-${project.version}"/>
<property name="resources" location="${project.root}/resources"/>
diff --git a/qpid/java/common/src/main/java/common.bnd b/qpid/java/common/src/main/java/common.bnd
index 6cd8a52976..ef56ecec9e 100755
--- a/qpid/java/common/src/main/java/common.bnd
+++ b/qpid/java/common/src/main/java/common.bnd
@@ -1,4 +1,23 @@
-ver: 0.7.0
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+ver: 0.9.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 37e731206c..a4db16742a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -20,13 +20,11 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
@@ -35,19 +33,16 @@ import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Struct;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.qpid.transport.codec.BBDecoder;
/**
* Assembler
*/
public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
- private static final Logger _log = LoggerFactory.getLogger(Assembler.class);
-
private final Receiver<ProtocolEvent> receiver;
- private final Map<Integer,List<Frame>> segments;
- private final Method[] incomplete;
+ private final Map<Integer, List<Frame>> segments;
+ private final Map<Integer, Method> incomplete;
private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
{
public BBDecoder initialValue()
@@ -59,8 +54,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
public Assembler(Receiver<ProtocolEvent> receiver)
{
this.receiver = receiver;
- segments = new HashMap<Integer,List<Frame>>();
- incomplete = new Method[64*1024];
+ segments = new HashMap<Integer, List<Frame>>();
+ incomplete = new HashMap<Integer, Method>();
+// incomplete = new Method[64*1024];
}
private int segmentKey(Frame frame)
@@ -102,12 +98,12 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
public void exception(Throwable t)
{
- this.receiver.exception(t);
+ receiver.exception(t);
}
public void closed()
{
- this.receiver.closed();
+ receiver.closed();
}
public void init(ProtocolHeader header)
@@ -188,7 +184,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.read(dec);
if (command.hasPayload())
{
- incomplete[channel] = command;
+ incomplete.put(channel, command);
}
else
{
@@ -196,8 +192,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
break;
case HEADER:
- command = incomplete[channel];
- List<Struct> structs = new ArrayList(2);
+ command = incomplete.get(channel);
+ List<Struct> structs = new ArrayList<Struct>(2);
while (dec.hasRemaining())
{
structs.add(dec.readStruct32());
@@ -205,14 +201,14 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.setHeader(new Header(structs));
if (frame.isLastSegment())
{
- incomplete[channel] = null;
+ incomplete.remove(channel);
emit(channel, command);
}
break;
case BODY:
- command = incomplete[channel];
+ command = incomplete.get(channel);
command.setBody(segment);
- incomplete[channel] = null;
+ incomplete.remove(channel);
emit(channel, command);
break;
default:
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 87cabeb874..08b3fae528 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -20,9 +20,15 @@
*/
package org.apache.qpid.transport.network;
-import static org.apache.qpid.transport.network.Frame.*;
-
import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
@@ -35,19 +41,14 @@ import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
/**
* Disassembler converts protocol events to byte buffers that can be sent on the network.
*/
public final class Disassembler implements Sender<ProtocolEvent>,
ProtocolDelegate<Void>
{
-
private final Sender<ByteBuffer> sender;
private final int maxPayload;
- private final ByteBuffer header;
private final Object sendlock = new Object();
private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>()
{
@@ -66,8 +67,6 @@ public final class Disassembler implements Sender<ProtocolEvent>,
}
this.sender = sender;
this.maxPayload = maxFrame - HEADER_SIZE;
- this.header = ByteBuffer.allocate(HEADER_SIZE);
- this.header.order(ByteOrder.BIG_ENDIAN);
}
@@ -78,39 +77,35 @@ public final class Disassembler implements Sender<ProtocolEvent>,
public void flush()
{
- synchronized (sendlock)
- {
- sender.flush();
- }
+ sender.flush();
}
public void close()
{
- synchronized (sendlock)
- {
- sender.close();
- }
+ sender.close();
}
private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
{
synchronized (sendlock)
{
- header.put(0, flags);
- header.put(1, type);
- header.putShort(2, (short) (size + HEADER_SIZE));
- header.put(5, track);
- header.putShort(6, (short) channel);
-
- header.rewind();
-
- sender.send(header);
- sender.flush();
+ ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE);
+ data.order(ByteOrder.BIG_ENDIAN);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+ data.position(HEADER_SIZE);
int limit = buf.limit();
buf.limit(buf.position() + size);
- sender.send(buf);
+ data.put(buf);
buf.limit(limit);
+
+ data.rewind();
+ sender.send(data);
}
}
@@ -166,14 +161,6 @@ public final class Disassembler implements Sender<ProtocolEvent>,
method(method, SegmentType.COMMAND);
}
- private ByteBuffer copy(ByteBuffer src)
- {
- ByteBuffer buf = ByteBuffer.allocate(src.remaining());
- buf.put(src);
- buf.flip();
- return buf;
- }
-
private void method(Method method, SegmentType type)
{
BBEncoder enc = encoder.get();
@@ -228,7 +215,6 @@ public final class Disassembler implements Sender<ProtocolEvent>,
{
fragment(LAST_SEG, SegmentType.BODY, method, body);
}
-
}
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
index bb7f059d15..c17527c19c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.transport.network;
-import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import org.apache.qpid.transport.TransportException;
@@ -34,7 +34,7 @@ public class Transport
public static final String UDP = "udp";
public static final String VM = "vm";
public static final String SOCKET = "socket";
- public static final String MULTICAST = "multicast";
+ public static final String MULTICAST = "multicast"; // TODO
public static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
public static final long DEFAULT_TIMEOUT = 60000;
@@ -43,20 +43,35 @@ public class Transport
public static final String MINA_TRANSPORT = "org.apache.qpid.transport.network.mina.MinaNetworkTransport";
public static final String IO_TRANSPORT = "org.apache.qpid.transport.network.io.IoNetworkTransport";
- public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport";
- public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport";
+ public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport"; // TODO
+ public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport"; // TODO
- private static final List<String> _incoming = new ArrayList<String>();
- private static final List<String> _outgoing = new ArrayList<String>();
+ private static final List<String> _incoming = new LinkedList<String>();
+ private static final List<String> _outgoing = new LinkedList<String>();
public static void registerIncomingTransport(Class<? extends IncomingNetworkTransport> transport)
{
- _incoming.add(transport.getName());
+ registerTransport(_incoming, transport.getName());
+ }
+
+ public static void registerIncomingTransport(String transport)
+ {
+ registerTransport(_incoming, transport);
}
public static void registerOutgoingTransport(Class<? extends OutgoingNetworkTransport> transport)
{
- _outgoing.add(transport.getName());
+ registerTransport(_outgoing, transport.getName());
+ }
+
+ public static void registerOutgoingTransport(String transport)
+ {
+ registerTransport(_outgoing, transport);
+ }
+
+ private static void registerTransport(List<String> registered, String transport)
+ {
+ registered.add(transport);
}
public static IncomingNetworkTransport getIncomingTransport() throws TransportException
@@ -71,7 +86,7 @@ public class Transport
public static OutgoingNetworkTransport getOutgoingTransport(String protocol) throws TransportException
{
- return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, MINA_TRANSPORT, protocol);
+ return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, IO_TRANSPORT, protocol);
}
private static NetworkTransport getTransport(String direction, List<String> registered, String defaultTransport, String protocol)
@@ -95,7 +110,7 @@ public class Transport
try
{
- String transport = System.getProperty("qpid.transport." + direction, MINA_TRANSPORT);
+ String transport = System.getProperty("qpid.transport." + direction, defaultTransport);
Class<?> clazz = Class.forName(transport);
NetworkTransport network = (NetworkTransport) clazz.newInstance();
if (protocol == null || network.isCompatible(protocol))
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index aa480554ea..0aee08adbe 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -66,8 +66,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport
{
_socket = new Socket();
- _log.debug("default-SO_RCVBUF : %s", _socket.getReceiveBufferSize());
- _log.debug("default-SO_SNDBUF : %s", _socket.getSendBufferSize());
+ _log.debug("default SO_RCVBUF " + _socket.getReceiveBufferSize());
+ _log.debug("default SO_SNDBUF " + _socket.getSendBufferSize());
_socket.setTcpNoDelay(noDelay);
_socket.setKeepAlive(keepAlive);
@@ -75,8 +75,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport
_socket.setReceiveBufferSize(receiveBufferSize);
_socket.setReuseAddress(true);
- _log.debug("new-SO_RCVBUF : %s", _socket.getReceiveBufferSize());
- _log.debug("new-SO_SNDBUF : %s", _socket.getSendBufferSize());
+ _log.debug("new SO_RCVBUF " + _socket.getReceiveBufferSize());
+ _log.debug("new SO_SNDBUF " + _socket.getSendBufferSize());
InetAddress address = InetAddress.getByName(settings.getHost());
_socket.connect(new InetSocketAddress(address, settings.getPort()));
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
index babfc3d698..d53031e21b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
@@ -24,19 +24,21 @@ import static org.apache.qpid.transport.util.Functions.*;
import static org.apache.qpid.configuration.ClientProperties.*;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.LoggingFilter;
import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.protocol.ReceiverFactory;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.NetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,12 +49,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter
{
private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class);
- private NetworkTransport _transport = null;
+ private MinaNetworkTransport _transport = null;
private SSLContextFactory _sslFactory = null;
private ReceiverFactory _factory = null;
private boolean _debug = false;
- public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory)
+ public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory)
{
_transport = transport;
_sslFactory = sslFactory;
@@ -60,7 +62,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter
_debug = Boolean.getBoolean("amqj.protocol.debug");
}
- public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory)
+ public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory)
{
this(transport, sslFactory, null);
}
@@ -83,6 +85,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter
public void exceptionCaught(IoSession ssn, Throwable e)
{
Receiver<java.nio.ByteBuffer> receiver = (Receiver) ssn.getAttachment();
+ _log.error("Caught exception in transport layer", e);
receiver.exception(e);
}
@@ -100,6 +103,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter
SessionUtil.initialize(session);
IoFilterChain chain = session.getFilterChain();
+ if (chain.contains(ExecutorThreadModel.class.getName()))
+ {
+ chain.remove(ExecutorThreadModel.class.getName());
+ }
+ IoFilterAdapter filter = new ExecutorFilter(_transport.getExecutor());
+ chain.addFirst("sessionExecutor", filter);
// Add SSL filter
if (_sslFactory != null)
@@ -158,8 +167,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter
{
_log.info("Idle MINA session: " + System.identityHashCode(session));
session.close();
- Receiver<java.nio.ByteBuffer> receiver = (Receiver) session.getAttachment();
- receiver.closed();
}
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
index 2010b2dd93..ac1b959de7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
@@ -28,23 +28,27 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoAcceptorConfig;
import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.PooledByteBufferAllocator;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.ThreadModel;
+import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.DatagramAcceptor;
-import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
import org.apache.mina.transport.socket.nio.DatagramConnector;
import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
@@ -71,7 +75,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
public static final List<String> SUPPORTED = Arrays.asList(Transport.SOCKET, Transport.TCP, Transport.UDP, Transport.VM);
private int _threads;
- private Executor _executor;
+ private ExecutorService _executor;
private ConnectionSettings _settings;
private SocketAddress _address;
private IoConnector _connector;
@@ -93,7 +97,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
- int processors = Runtime.getRuntime().availableProcessors();
+ int processors = (Runtime.getRuntime().availableProcessors() * 4) + 1;
_threads = Integer.parseInt(System.getProperty("amqj.processors", Integer.toString(processors)));
_executor = Executors.newCachedThreadPool(Threading.getThreadFactory());
}
@@ -130,7 +134,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
if (socket == null)
{
throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket://<SocketID>' transport");
+ "with 'socket://<SocketID>' transport");
}
_address = socket.getRemoteSocketAddress();
_connector = new ExistingSocketConnector(1, _executor);
@@ -142,25 +146,26 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
_log.info("Connecting to broker on: " + _address);
- String s = "-";
+ String name = "MINANetworkTransport(Client)";
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
for (StackTraceElement elt : trace)
{
- if (elt.getClassName().contains("Test"))
+ if (elt.getClassName().endsWith("Test"))
{
- s += elt.getClassName();
- break;
+ name += "-" + elt.getClassName();
+// break; // FIXME
}
}
-
- IoServiceConfig cfg = _connector.getDefaultConfig();
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Client)" + s));
-
+
+ IoServiceConfig config = _connector.getDefaultConfig();
+ config.setThreadModel(ThreadModel.MANUAL);
+
// Socket based connection configuration only (TCP/SOCKET)
if (_connector instanceof SocketConnector)
{
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ SocketSessionConfig scfg = (SocketSessionConfig) config.getSessionConfig();
scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+ scfg.setKeepAlive(Boolean.getBoolean("amqj.keepAlive"));
Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE);
Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE);
scfg.setSendBufferSize(sendBufferSize);
@@ -173,7 +178,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
// Connect to the broker
- ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), cfg);
+ ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), config);
future.join();
if (!future.isConnected())
{
@@ -181,6 +186,14 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
_session = future.getSession();
_session.setAttachment(_receiver);
+
+ IoFilterChain chain = _session.getFilterChain();
+ if (chain.contains(ExecutorThreadModel.class.getName()))
+ {
+ chain.remove(ExecutorThreadModel.class.getName());
+ }
+ IoFilterAdapter filter = new ExecutorFilter(_executor);
+ chain.addFirst("clientExecutor", filter);
return new MinaNetworkConnection(_session);
}
@@ -191,9 +204,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_acceptor = new SocketAcceptor(_threads, _executor);
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
- sconfig.setDisconnectOnUnbind(true);
- SocketSessionConfig ssc = (SocketSessionConfig) sconfig.getSessionConfig();
+ SocketSessionConfig ssc = (SocketSessionConfig) _acceptor.getDefaultConfig().getSessionConfig();
ssc.setReuseAddress(true);
ssc.setKeepAlive(Boolean.getBoolean("amqj.keepAlive"));
ssc.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
@@ -215,9 +226,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_acceptor = new DatagramAcceptor(_executor);
- DatagramAcceptorConfig dconfig = (DatagramAcceptorConfig) _acceptor.getDefaultConfig();
- dconfig.setDisconnectOnUnbind(true);
- DatagramSessionConfig dsc = (DatagramSessionConfig) dconfig.getSessionConfig();
+ DatagramSessionConfig dsc = (DatagramSessionConfig) _acceptor.getDefaultConfig().getSessionConfig();
dsc.setReuseAddress(true);
Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE);
Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE);
@@ -235,16 +244,17 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
else if (settings.getProtocol().equalsIgnoreCase(Transport.VM))
{
- _acceptor = new VmPipeAcceptor();
- _address = new VmPipeAddress(settings.getPort());
+ _acceptor = new VmPipeAcceptor();
+ _address = new VmPipeAddress(settings.getPort());
}
else
{
throw new TransportException("Unknown protocol: " + settings.getProtocol());
}
- IoServiceConfig cfg = _acceptor.getDefaultConfig();
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Broker)"));
+ IoAcceptorConfig config = (IoAcceptorConfig) _acceptor.getDefaultConfig();
+ config.setThreadModel(ThreadModel.MANUAL);
+ config.setDisconnectOnUnbind(true);
try
{
@@ -255,6 +265,11 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
throw new TransportException("Could not bind to " + _address, e);
}
}
+
+ public Executor getExecutor()
+ {
+ return _executor;
+ }
public SocketAddress getAddress()
{
@@ -275,6 +290,10 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_session.close();
}
+ if (_executor != null)
+ {
+ _executor.shutdownNow();
+ }
}
public boolean isCompatible(String protocol) {
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
index 5fc3032d35..10d70ed34f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
@@ -26,20 +26,16 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.qpid.transport.network.Transport;
/**
* MinaSender
*/
public class MinaSender implements Sender<java.nio.ByteBuffer>
{
- private static final Logger _log = LoggerFactory.getLogger(MinaSender.class);
-
private final IoSession _session;
- private WriteFuture _lastWrite;
- private int _idleTimeout = 0;
+ private int _idle = 0;
+ private WriteFuture _written;
public MinaSender(IoSession session)
{
@@ -52,41 +48,36 @@ public class MinaSender implements Sender<java.nio.ByteBuffer>
{
throw new TransportException("attempted to write to a closed socket");
}
- ByteBuffer mina = ByteBuffer.allocate(buf.capacity());
- mina.put(buf);
- mina.flip();
- flush();
- _lastWrite = _session.write(mina);
+ _written = _session.write(ByteBuffer.wrap(buf));
}
public synchronized void flush()
{
- if (_lastWrite != null)
+ if (_written != null)
{
- _lastWrite.join();
- if (!_lastWrite.isWritten())
- {
- throw new RuntimeException("Error flushing buffer");
- }
+ _written.join(Transport.DEFAULT_TIMEOUT);
+ if (!_written.isWritten())
+ {
+ throw new TransportException("Error flushing data buffer");
+ }
}
}
- public void close()
+ public synchronized void close()
{
- // MINA will sometimes throw away in-progress writes when you ask it to close
flush();
CloseFuture closed = _session.close();
closed.join();
}
- public void setIdleTimeout(int i)
+ public void setIdleTimeout(int idle)
{
- _idleTimeout = i;
- _session.setWriteTimeout(_idleTimeout);
+ _idle = idle;
+ _session.setWriteTimeout(_idle);
}
public long getIdleTimeout()
{
- return _idleTimeout;
+ return _idle;
}
}
diff --git a/qpid/java/lib/slf4j-api-1.4.0.jar b/qpid/java/lib/slf4j-api-1.4.0.jar
deleted file mode 100644
index 9ce2532aa4..0000000000
--- a/qpid/java/lib/slf4j-api-1.4.0.jar
+++ /dev/null
Binary files differ
diff --git a/qpid/java/lib/slf4j-api-1.6.1.jar b/qpid/java/lib/slf4j-api-1.6.1.jar
new file mode 100644
index 0000000000..42e0ad0de7
--- /dev/null
+++ b/qpid/java/lib/slf4j-api-1.6.1.jar
Binary files differ
diff --git a/qpid/java/lib/slf4j-log4j12-1.4.0.jar b/qpid/java/lib/slf4j-log4j12-1.4.0.jar
deleted file mode 100644
index e8e09f35ca..0000000000
--- a/qpid/java/lib/slf4j-log4j12-1.4.0.jar
+++ /dev/null
Binary files differ
diff --git a/qpid/java/lib/slf4j-log4j12-1.6.1.jar b/qpid/java/lib/slf4j-log4j12-1.6.1.jar
new file mode 100644
index 0000000000..873d11983e
--- /dev/null
+++ b/qpid/java/lib/slf4j-log4j12-1.6.1.jar
Binary files differ
diff --git a/qpid/java/management/client/README.txt b/qpid/java/management/client/README.txt
index 34a48f1f50..ecd61da75e 100644
--- a/qpid/java/management/client/README.txt
+++ b/qpid/java/management/client/README.txt
@@ -39,4 +39,4 @@ Administration
After QMan has been started successfully you can browse its administration console pointing your browser to :
-http://<host>:<port>/qman/admin.jsp \ No newline at end of file
+http://<host>:<port>/qman/console
diff --git a/qpid/java/management/client/bin/qman-wsdm-start.cmd b/qpid/java/management/client/bin/qman-wsdm-start.cmd
index df30ce8617..ec8321c6b8 100644
--- a/qpid/java/management/client/bin/qman-wsdm-start.cmd
+++ b/qpid/java/management/client/bin/qman-wsdm-start.cmd
@@ -59,8 +59,8 @@ SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\start.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-6.1.14.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-util-6.1.14.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\geronimo-servlet_2.5_spec-1.2.jar
-SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-api-1.4.0.jar
-SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-log4j12-1.4.0.jar
+SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-api-1.6.1.jar
+SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-log4j12-1.6.1.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\log4j-1.2.12.jar
echo ===============================================================================
@@ -85,4 +85,4 @@ echo.
echo ===============================================================================
echo.
-%JAVA% -cp %CLASSPATH% -DQMAN_HOME=%QMAN_HOME% -Djetty.home=%QMAN_HOME% -Dqman.host=%QMAN_WSDM_ADAPTER_HOST% -Dqman.port=%QMAN_WSDM_ADAPTER_PORT% -DSTOP.PORT=%ADMIN_PORT% -DSTOP.KEY=%ADMIN_KEY% -Dqman-config=%QMAN_CONFIG_FILE% org.mortbay.start.Main %JETTY_CONFIG_FILE% \ No newline at end of file
+%JAVA% -cp %CLASSPATH% -DQMAN_HOME=%QMAN_HOME% -Djetty.home=%QMAN_HOME% -Dqman.host=%QMAN_WSDM_ADAPTER_HOST% -Dqman.port=%QMAN_WSDM_ADAPTER_PORT% -DSTOP.PORT=%ADMIN_PORT% -DSTOP.KEY=%ADMIN_KEY% -Dqman-config=%QMAN_CONFIG_FILE% org.mortbay.start.Main %JETTY_CONFIG_FILE%
diff --git a/qpid/java/management/client/bin/qman-wsdm-start.sh b/qpid/java/management/client/bin/qman-wsdm-start.sh
index 39a4cba66e..0024890527 100644
--- a/qpid/java/management/client/bin/qman-wsdm-start.sh
+++ b/qpid/java/management/client/bin/qman-wsdm-start.sh
@@ -58,7 +58,7 @@ ADMIN_KEY=gazzax
QMAN_LIBS=$QMAN_HOME/lib
JETTY_CONFIG_FILE=$QMAN_HOME/etc/jetty.xml
-QMAN_CLASSPATH=$QMAN_HOME/etc:$QMAN_LIBS/start.jar:$QMAN_LIBS/jetty-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/geronimo-servlet_2.5_spec-1.2.jar:$QMAN_LIBS/slf4j-api-1.4.0.jar:$QMAN_LIBS/slf4j-log4j12-1.4.0.jar:$QMAN_LIBS/log4j-1.2.12.jar
+QMAN_CLASSPATH=$QMAN_HOME/etc:$QMAN_LIBS/start.jar:$QMAN_LIBS/jetty-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/geronimo-servlet_2.5_spec-1.2.jar:$QMAN_LIBS/slf4j-api-1.6.1.jar:$QMAN_LIBS/slf4j-log4j12-1.6.1.jar:$QMAN_LIBS/log4j-1.2.12.jar
echo "==============================================================================="
echo""
@@ -86,4 +86,4 @@ echo""
echo "==============================================================================="
echo""
-"$JAVA" $JAVA_OPTS -cp $QMAN_CLASSPATH -DQMAN_HOME=$QMAN_HOME -Djetty.home=$QMAN_HOME -Dqman.host=$QMAN_WSDM_ADAPTER_HOST -Dqman.port=$QMAN_WSDM_ADAPTER_PORT -DSTOP.PORT=$ADMIN_PORT -DSTOP.KEY=$ADMIN_KEY -Dqman-config=$QMAN_CONFIG_FILE org.mortbay.start.Main $JETTY_CONFIG_FILE \ No newline at end of file
+"$JAVA" $JAVA_OPTS -cp $QMAN_CLASSPATH -DQMAN_HOME=$QMAN_HOME -Djetty.home=$QMAN_HOME -Dqman.host=$QMAN_WSDM_ADAPTER_HOST -Dqman.port=$QMAN_WSDM_ADAPTER_PORT -DSTOP.PORT=$ADMIN_PORT -DSTOP.KEY=$ADMIN_KEY -Dqman-config=$QMAN_CONFIG_FILE org.mortbay.start.Main $JETTY_CONFIG_FILE
diff --git a/qpid/java/management/client/build.xml b/qpid/java/management/client/build.xml
index f623449c4b..a67f69c43d 100644
--- a/qpid/java/management/client/build.xml
+++ b/qpid/java/management/client/build.xml
@@ -44,9 +44,9 @@
<copy todir="${module.release}" failonerror="false" flatten="true">
<fileset dir="${resources}" excludes="META-INF">
<exclude name="META-INF"/>
- <exclude name="README"/>
+ <exclude name="README.txt"/>
</fileset>
- <fileset file="${module.build}${file.separator}README"/>
+ <fileset file="${module.build}${file.separator}README.txt"/>
</copy>
</target>
@@ -153,7 +153,7 @@
<target name="copy-README-to-build">
<copy todir="${module.build}">
<fileset dir="${module.src}${file.separator}..${file.separator}..${file.separator}..">
- <include name="README"/>
+ <include name="README.txt"/>
</fileset>
</copy>
</target>
@@ -163,7 +163,7 @@
<mkdir dir="${examples.folder}${file.separator}sample_messages"/>
<copy todir="${examples.folder}">
<fileset dir="${module.src}${file.separator}..${file.separator}..${file.separator}example">
- <include name="README"/>
+ <include name="README.txt"/>
</fileset>
</copy>
<copy todir="${examples.folder}${file.separator}src">
@@ -175,7 +175,7 @@
<copy todir="${examples.folder}${file.separator}sample_messages">
<fileset dir="${module.src}${file.separator}..${file.separator}..${file.separator}example" >
<exclude name="**/*.java"/>
- <exclude name="**/README"/>
+ <exclude name="**/README.txt"/>
<include name="**/*.out.*"/>
</fileset>
</copy>
diff --git a/qpid/java/management/common/src/main/java/management-common.bnd b/qpid/java/management/common/src/main/java/management-common.bnd
index 3b2c34b06e..cb28d309a6 100644
--- a/qpid/java/management/common/src/main/java/management-common.bnd
+++ b/qpid/java/management/common/src/main/java/management-common.bnd
@@ -1,8 +1,27 @@
-ver: 0.7.0
-
-Bundle-SymbolicName: qpid-management-common
-Bundle-Version: ${ver}
-Export-Package: *;version=${ver}
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+ver: 0.9.0
+
+Bundle-SymbolicName: qpid-management-common
+Bundle-Version: ${ver}
+Export-Package: *;version=${ver}
Bundle-RequiredExecutionEnvironment: J2SE-1.5
Require-Bundle: jmxremote.sasl;resolution:=optional
diff --git a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF
index 7db972995b..124fe1e767 100644
--- a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF
+++ b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF
@@ -3,7 +3,7 @@ Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
Bundle-ManifestVersion: 2
Bundle-Name: Qpid JMX Management Console Plug-in
Bundle-SymbolicName: org.apache.qpid.management.ui; singleton:=true
-Bundle-Version: 0.7.0
+Bundle-Version: 0.9.0
Bundle-Activator: org.apache.qpid.management.ui.Activator
Bundle-Vendor: Apache Software Foundation
Bundle-Localization: plugin
diff --git a/qpid/java/management/eclipse-plugin/src/main/resources/jmxremote.sasl-plugin/MANIFEST.MF b/qpid/java/management/eclipse-plugin/src/main/resources/jmxremote.sasl-plugin/MANIFEST.MF
index fa11bac2ea..83c7c9f435 100644
--- a/qpid/java/management/eclipse-plugin/src/main/resources/jmxremote.sasl-plugin/MANIFEST.MF
+++ b/qpid/java/management/eclipse-plugin/src/main/resources/jmxremote.sasl-plugin/MANIFEST.MF
@@ -1,4 +1,5 @@
Manifest-Version: 1.0
+Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
Bundle-ManifestVersion: 2
Bundle-Name: jmx sasl Plug-in
Bundle-SymbolicName: jmxremote.sasl
diff --git a/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/Info.plist b/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/Info.plist
index e06c8a6e60..c6482a9254 100644
--- a/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/Info.plist
+++ b/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/Info.plist
@@ -1,5 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
<plist version="1.0">
<dict>
<key>CFBundleExecutable</key>
diff --git a/qpid/java/release-docs/RELEASE_NOTES.txt b/qpid/java/release-docs/RELEASE_NOTES.txt
index 690b04dfc0..f94c45fd4d 100644
--- a/qpid/java/release-docs/RELEASE_NOTES.txt
+++ b/qpid/java/release-docs/RELEASE_NOTES.txt
@@ -1,9 +1,10 @@
-Apache Qpid Incubating Java M4 Release Notes
+Apache Qpid Java 0.8 Release Notes
-------------------------------------------
-The Qpid M4 release contains support for AMQP 0-8, 0-9 and 0-10. You
+The Qpid 0.8 release contains support for AMQP 0-8, 0-9 and 0-10. You
can access the specifications from
-http://www.amqp.org/tikiwiki/tiki-index.php?page=Download
+
+http://www.amqp.org/confluence/display/AMQP/AMQP+Specification
For full details of Apache Qpid's capabilities see our detailed
project documentation at:
@@ -13,21 +14,6 @@ http://cwiki.apache.org/confluence/display/qpid/Qpid+Java+Documentation
From the link above you can access our Getting Started Guide, FAQ, Build How To
and detailed developer documentation.
-New features, Improvements and Bug fixes
-----------------------
-
-A security related problem was addressed. If Base64MD5 passwords are
-turned on on the broker and it has been configured to use JMXMP via
-the addition of jxmremote_optional.jar to the classpath, it is
-possible for an attacker to bypass the authentication on the JMX
-management interface due to a bug in password verification.
-
-A new command line management interface was added (qpid-cli)
-
-A full list of changes can be found at:
-https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12313279&styleName=Text&projectId=12310520
-
-
Known Issues/Outstanding Work
-----------------------------
diff --git a/qpid/java/systests/etc/config-systests-settings.xml b/qpid/java/systests/etc/config-systests-settings.xml
index a7f538aec1..751ff133cb 100644
--- a/qpid/java/systests/etc/config-systests-settings.xml
+++ b/qpid/java/systests/etc/config-systests-settings.xml
@@ -24,6 +24,8 @@
<enabled>false</enabled>
<ssl>
<enabled>false</enabled>
+ <keyStorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/keystore.jks</keyStorePath>
+ <keyStorePassword>password</keyStorePassword>
</ssl>
</management>
<virtualhosts>${QPID_HOME}/etc/virtualhosts-systests.xml</virtualhosts>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
index 8946548353..2dded57dd0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
@@ -138,7 +138,8 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase
Message msg = _consumer.receive(3000);
assertNotNull("Message should not be null", msg);
assertTrue("Message should be a text message", msg instanceof TextMessage);
- assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
+ _logger.error("== " + Integer.toString(i) + " == " + ((TextMessage) msg).getText());
+ assertEquals("Message content does not match", Integer.toString(i), ((TextMessage) msg).getText());
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
index acb5d12e57..43a0f4dbec 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
@@ -169,7 +169,7 @@ public class QueueDepthWithSelectorTest extends QpidBrokerTestCase
for (int i = 0; i < MSG_COUNT; i++)
{
_messages[i] = _consumer.receive(1000);
- assertNotNull("should have received a message but didn't", _messages[i]);
+ assertNotNull("should have received a message but didn't " + i, _messages[i]);
}
// long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index af7c08ca65..1d1f4a53d5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -280,6 +280,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
+
+ // The client should be able to query and verify the existence of my-exchange (QPID-2774)
+ dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}");
+ cons = jmsSession.createConsumer(dest);
}
public void testBindQueueWithArgs() throws Exception
@@ -684,9 +688,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
/**
- * Test Goal : Verify that unique subscription queues are created when consumers are
- * created using the same destination except when the subscription queue
- * has a name.
+ * Test Goal : When the same destination is used when creating two consumers,
+ * If the type == topic, verify that unique subscription queues are created,
+ * unless subscription queue has a name.
+ *
+ * If the type == queue, same queue should be shared.
*/
public void testSubscriptionForSameDestination() throws Exception
{
@@ -715,6 +721,28 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
catch(Exception e)
{
}
+ _connection.close();
+
+ _connection = getConnection() ;
+ _connection.start();
+ ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ dest = ssn.createTopic("ADDR:my_queue; {create: always}");
+ consumer1 = ssn.createConsumer(dest);
+ consumer2 = ssn.createConsumer(dest);
+ prod = ssn.createProducer(dest);
+
+ prod.send(ssn.createTextMessage("A"));
+ Message m1 = consumer1.receive(1000);
+ Message m2 = consumer2.receive(1000);
+
+ if (m1 != null)
+ {
+ assertNull("Only one consumer should receive the message",m2);
+ }
+ else
+ {
+ assertNotNull("Only one consumer should receive the message",m2);
+ }
}
public void testXBindingsWithoutExchangeName() throws Exception
@@ -752,4 +780,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
}
+
+ public void testXSubscribeOverrides() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ Destination dest = ssn.createTopic(str);
+ MessageConsumer consumer1 = ssn.createConsumer(dest);
+ try
+ {
+ MessageConsumer consumer2 = ssn.createConsumer(dest);
+ fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber");
+ }
+ catch(Exception e)
+ {
+ }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
index 59ce64eb4f..8c5299e301 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
@@ -239,7 +239,8 @@ public class BytesMessageTest extends QpidBrokerTestCase implements MessageListe
{
if (expected[i] != actual[i])
{
- throw new RuntimeException("Failed on byte " + i + " of " + expected.length);
+ throw new RuntimeException("Failed on byte " + i + " of " + expected.length +
+ "(" + new String(expected) +", " + new String(actual) + ")");
}
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
index d97e22e024..e6f3ef7493 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
@@ -41,11 +41,18 @@ public class LargeMessageTest extends QpidBrokerTestCase
private static final Logger _logger = LoggerFactory.getLogger(LargeMessageTest.class);
private Destination _destination;
- private AMQSession _session;
+ private AMQSession<?, ?> _session;
private AMQConnection _connection;
protected void setUp() throws Exception
{
+ // Smaller packet size for UDP
+ if (Boolean.getBoolean("profile.udp"))
+ {
+ setConfigurationProperty("advanced.framesize", "20000");
+ setBrokerEnvironment("qpid.maxFrameSize", "20000");
+ }
+
super.setUp();
try
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
index 17ac0dfff2..50162449e5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
@@ -46,7 +46,7 @@ public class ConnectionCloseTest extends QpidBrokerTestCase
public void testSendReceiveClose() throws Exception
{
Map<Thread,StackTraceElement[]> before = Thread.getAllStackTraces();
-
+
for (int i = 0; i < 500; i++)
{
if ((i % 10) == 0)
@@ -92,7 +92,7 @@ public class ConnectionCloseTest extends QpidBrokerTestCase
assertTrue("Spurious thread creation exceeded threshold, " +
delta.size() + " threads created.",
- delta.size() < 100);
+ delta.size() < 10);
}
private void dumpStacks(Map<Thread,StackTraceElement[]> map)
@@ -104,5 +104,4 @@ public class ConnectionCloseTest extends QpidBrokerTestCase
log.warn(t, entry.getKey().toString());
}
}
-
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 6bf610ff90..68d774fce6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -272,6 +272,28 @@ public class ConnectionTest extends QpidBrokerTestCase
}
connection.close();
}
+
+ public void testUnsupportedSASLMechanism() throws Exception
+ {
+ BrokerDetails broker = getBroker();
+ broker.setProperty(BrokerDetails.OPTIONS_SASL_MECHS, "MY_MECH");
+
+ try
+ {
+ Connection connection = new AMQConnection(broker.toString(), "guest", "guest",
+ null, "test");
+ connection.close();
+ fail("The client should throw a ConnectionException stating the" +
+ " broker does not support the SASL mech specified by the client");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Incorrect exception thrown",
+ e.getMessage().contains("The following SASL mechanisms " +
+ "[MY_MECH]" +
+ " specified by the client are not supported by the broker"));
+ }
+ }
public static junit.framework.Test suite()
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index d73761d12a..d799b141c0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -657,7 +657,14 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
session.commit();
// Check queue has no messages
- assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ if (isJavaBroker())
+ {
+ assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ }
+ else
+ {
+ assertTrue("At most the queue should have only 1 message", ((AMQSession<?, ?>) session).getQueueDepth(queue) <= 1);
+ }
// Unsubscribe
session.unsubscribe("sameMessageSelector");
@@ -671,7 +678,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
* <li>create another durable subscriber with a different selector and same name
* <li>check first subscriber is now closed
* <li>create a publisher and send messages
- * <li>check messages are recieved correctly
+ * <li>check messages are received correctly
* </ul>
* <p>
* QPID-2418
@@ -704,6 +711,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
e.printStackTrace();
}
+ conn.stop();
+
// Send 1 matching message and 1 non-matching message
MessageProducer producer = session.createProducer(topic);
TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
@@ -718,6 +727,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose");
assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ conn.start();
+
Message rMsg = subB.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
@@ -768,6 +779,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
e.printStackTrace();
}
+ conn.stop();
+
// Send 1 matching message and 1 non-matching message
MessageProducer producer = session.createProducer(topic);
TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
@@ -782,6 +795,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName");
assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ conn.start();
+
Message rMsg = subTwo.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
index f631a9a4ba..074c2fa566 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
@@ -243,11 +243,14 @@ public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements Ex
/** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */
public void onException(JMSException jmse)
{
- _caught.countDown();
- _message = jmse.getLinkedException().getMessage();
- if (jmse.getLinkedException() instanceof AMQException)
+ if (_caught.getCount() > 0L)
{
- _code = ((AMQException) jmse.getLinkedException()).getErrorCode();
+ _caught.countDown();
+ _message = jmse.getLinkedException().getMessage();
+ if (jmse.getLinkedException() instanceof AMQException)
+ {
+ _code = ((AMQException) jmse.getLinkedException()).getErrorCode();
+ }
}
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 96fc865fb4..bf5d32d2e6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -639,20 +639,23 @@ public class QpidBrokerTestCase extends QpidTestCase
public void stopBroker(int port) throws Exception
{
- port = getPort(port);
-
_logger.info("stopping broker: " + getBrokerCommand(port) + " on port " + port);
- Process process = _brokers.remove(port);
- if (process != null)
- {
- process.destroy();
- process.waitFor();
- _logger.info("broker exited: " + process.exitValue());
- }
- else if (_broker.equals(VM))
+ if (_broker.equals(VM))
{
VmBroker.killVMBroker();
}
+ else
+ {
+ port = getPort(port);
+
+ Process process = _brokers.remove(port);
+ if (process != null)
+ {
+ process.destroy();
+ process.waitFor();
+ _logger.info("broker exited: " + process.exitValue());
+ }
+ }
}
/**
@@ -973,20 +976,22 @@ public class QpidBrokerTestCase extends QpidTestCase
protected void tearDown() throws Exception
{
- try
+ // close all the connections used by this test.
+ for (Connection c : _connections)
{
- // close all the connections used by this test.
- for (Connection c : _connections)
- {
+ try
+ {
c.close();
}
+ catch (Exception e)
+ {
+ _logger.warn("Error closing connection", e);
+ }
}
- finally
- {
- // Ensure any problems with close does not interfer with property resets
- super.tearDown();
- revertLoggingLevels();
- }
+
+ // Ensure any problems with close does not interfer with property resets
+ super.tearDown();
+ revertLoggingLevels();
}
/**
diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes
index a497eaa355..5c225e3b2e 100644
--- a/qpid/java/test-profiles/08StandaloneExcludes
+++ b/qpid/java/test-profiles/08StandaloneExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
//======================================================================
//Exclude the following from brokers defaulting to the 0-8 protocol
//======================================================================
@@ -19,4 +38,6 @@ org.apache.qpid.test.unit.message.UTF8Test#*
org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait
// XA Needs 0-10
-org.apache.qpid.test.unit.xa.* \ No newline at end of file
+org.apache.qpid.test.unit.xa.*
+
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMechanism
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 711a3954e4..e89b09cca2 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#*
org.apache.qpid.client.ResetMessageListenerTest#*
diff --git a/qpid/java/test-profiles/CPPNoPrefetchExcludes b/qpid/java/test-profiles/CPPNoPrefetchExcludes
index df188ef628..ebcd430161 100644
--- a/qpid/java/test-profiles/CPPNoPrefetchExcludes
+++ b/qpid/java/test-profiles/CPPNoPrefetchExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
org.apache.qpid.test.unit.transacted.TransactedTest#testRollback
org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverWithQueueBrowser
diff --git a/qpid/java/test-profiles/CPPPrefetchExcludes b/qpid/java/test-profiles/CPPPrefetchExcludes
index 6b0014b917..7ef52f89c7 100644
--- a/qpid/java/test-profiles/CPPPrefetchExcludes
+++ b/qpid/java/test-profiles/CPPPrefetchExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
// those tests should be run with prefetch off
org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only
org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth
diff --git a/qpid/java/test-profiles/CPPTransientExcludes b/qpid/java/test-profiles/CPPTransientExcludes
index 90b4251807..47f24db19c 100644
--- a/qpid/java/test-profiles/CPPTransientExcludes
+++ b/qpid/java/test-profiles/CPPTransientExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
// those tests need durable subscribe states to be persisted
org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes
index 12905f238e..9cfc19999c 100644
--- a/qpid/java/test-profiles/Excludes
+++ b/qpid/java/test-profiles/Excludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
//======================================================================
//These tests are *always* excluded
//======================================================================
@@ -34,4 +53,4 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#*
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
// QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail.
-org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart \ No newline at end of file
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes
index eecffbbd2c..533e6976be 100755
--- a/qpid/java/test-profiles/Java010Excludes
+++ b/qpid/java/test-profiles/Java010Excludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
//======================================================================
//These tests will not work over AMQP 0-10
//======================================================================
@@ -16,6 +35,9 @@ org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFails
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub
+// 0-10 does not have AMQProtocolHandler access
+org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#*
+
//this test checks explicitly for 0-8 flow control semantics
org.apache.qpid.test.client.FlowControlTest#*
diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes
index 57fb525f32..b38b482be5 100644
--- a/qpid/java/test-profiles/JavaExcludes
+++ b/qpid/java/test-profiles/JavaExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
//======================================================================
//These tests do not work with the Java broker
//======================================================================
diff --git a/qpid/java/test-profiles/JavaInVMExcludes b/qpid/java/test-profiles/JavaInVMExcludes
index c51da125be..65981e4801 100644
--- a/qpid/java/test-profiles/JavaInVMExcludes
+++ b/qpid/java/test-profiles/JavaInVMExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
//======================================================================
//Exclude the following tests when running the InVM default test profile
//======================================================================
diff --git a/qpid/java/test-profiles/JavaPersistentExcludes b/qpid/java/test-profiles/JavaPersistentExcludes
index 54650648ed..0c7be9e509 100644
--- a/qpid/java/test-profiles/JavaPersistentExcludes
+++ b/qpid/java/test-profiles/JavaPersistentExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
//======================================================================
//These tests require the MemoryMessageStore
//======================================================================
diff --git a/qpid/java/test-profiles/JavaStandaloneExcludes b/qpid/java/test-profiles/JavaStandaloneExcludes
index ca3a872119..4ec9f1fc02 100644
--- a/qpid/java/test-profiles/JavaStandaloneExcludes
+++ b/qpid/java/test-profiles/JavaStandaloneExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
//======================================================================
//These tests require an InVm broker
//======================================================================
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index f192aadccd..7f2f1c2d90 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
//======================================================================
//These tests require a persistent store
//======================================================================
diff --git a/qpid/java/test-profiles/XAExcludes b/qpid/java/test-profiles/XAExcludes
index 1bb26c5f27..907864a730 100644
--- a/qpid/java/test-profiles/XAExcludes
+++ b/qpid/java/test-profiles/XAExcludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
org.apache.qpid.test.unit.xa.QueueTest#*
org.apache.qpid.test.unit.xa.TopicTest#*
org.apache.qpid.test.unit.xa.FaultTest#*
diff --git a/qpid/java/test-profiles/cpp.async.excludes b/qpid/java/test-profiles/cpp.async.excludes
index b6479a00ba..d700538345 100644
--- a/qpid/java/test-profiles/cpp.async.excludes
+++ b/qpid/java/test-profiles/cpp.async.excludes
@@ -1,2 +1,21 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
// the C++ broker doesn't guarantee the order of messages on recovery
org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
diff --git a/qpid/java/test-profiles/cpp.async.testprofile b/qpid/java/test-profiles/cpp.async.testprofile
index ac8b98471e..c3d47f0ce6 100644
--- a/qpid/java/test-profiles/cpp.async.testprofile
+++ b/qpid/java/test-profiles/cpp.async.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
include=cpp
profile.excludes=CPPPrefetchExcludes
broker.modules=--load-module ${broker.module.store}
diff --git a/qpid/java/test-profiles/cpp.cluster.testprofile b/qpid/java/test-profiles/cpp.cluster.testprofile
index 4bfd4f69a2..22a082e85b 100644
--- a/qpid/java/test-profiles/cpp.cluster.testprofile
+++ b/qpid/java/test-profiles/cpp.cluster.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
include=cpp
broker.modules=--load-module ${broker.module.cluster} --cluster-name cpp-java-test-cluster
diff --git a/qpid/java/test-profiles/cpp.excludes b/qpid/java/test-profiles/cpp.excludes
index 64417a0edc..c8fae1797e 100644
--- a/qpid/java/test-profiles/cpp.excludes
+++ b/qpid/java/test-profiles/cpp.excludes
@@ -1,3 +1,22 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
//======================================================================
//Exclude the following tests when running all cpp test profilies
//======================================================================
diff --git a/qpid/java/test-profiles/cpp.noprefetch.testprofile b/qpid/java/test-profiles/cpp.noprefetch.testprofile
index b43bdd5722..4764cb576b 100644
--- a/qpid/java/test-profiles/cpp.noprefetch.testprofile
+++ b/qpid/java/test-profiles/cpp.noprefetch.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
include=cpp
profile.excludes=CPPTransientExcludes CPPNoPrefetchExcludes
max_prefetch=0
diff --git a/qpid/java/test-profiles/cpp.ssl.excludes b/qpid/java/test-profiles/cpp.ssl.excludes
index 1828581d55..4d499c57b9 100644
--- a/qpid/java/test-profiles/cpp.ssl.excludes
+++ b/qpid/java/test-profiles/cpp.ssl.excludes
@@ -1 +1,20 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
#org.apache.qpid.test.client.failover.FailoverTest#*
diff --git a/qpid/java/test-profiles/cpp.ssl.testprofile b/qpid/java/test-profiles/cpp.ssl.testprofile
index 9f2581a83a..bf71384835 100644
--- a/qpid/java/test-profiles/cpp.ssl.testprofile
+++ b/qpid/java/test-profiles/cpp.ssl.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
include=cpp
broker.modules=--load-module ${broker.module.ssl} --ssl-cert-name localhost.localdomain --ssl-cert-password-file ${test.profiles}/test_resources/ssl/pfile --ssl-cert-db ${test.profiles}/test_resources/ssl/server_db/ --ssl-require-client-authentication --ssl-port @SSL_PORT
diff --git a/qpid/java/test-profiles/cpp.testprofile b/qpid/java/test-profiles/cpp.testprofile
index f6d8d6f353..694e22f48c 100644
--- a/qpid/java/test-profiles/cpp.testprofile
+++ b/qpid/java/test-profiles/cpp.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
broker.version=0-10
broker.language=cpp
diff --git a/qpid/java/test-profiles/default.0.10.testprofile b/qpid/java/test-profiles/default.0.10.testprofile
index 2cec26d632..d14042f786 100644
--- a/qpid/java/test-profiles/default.0.10.testprofile
+++ b/qpid/java/test-profiles/default.0.10.testprofile
@@ -3,3 +3,5 @@ broker.version=0-10
qpid.amqp.version=0-10
amqj.protocolprovider.class=org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory
profile.excludes=JavaTransientExcludes JavaInVMExcludes Java010Excludes
+amqj.protocol.debug=true
+#qpid.transport.outgoing=org.apache.qpid.transport.network.io.IoNetworkTransport \ No newline at end of file
diff --git a/qpid/java/test-profiles/default.testprofile b/qpid/java/test-profiles/default.testprofile
index 9612aabf5f..2b5c552504 100644
--- a/qpid/java/test-profiles/default.testprofile
+++ b/qpid/java/test-profiles/default.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
java.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory
java.naming.provider.url=${test.profiles}/test-provider.properties
@@ -41,4 +59,5 @@ haltonfailure=no
haltonerror=no
exclude.modules=none
-profile.clustered=false \ No newline at end of file
+profile.clustered=false
+amqj.protocol.debug=true
diff --git a/qpid/java/test-profiles/java-derby.0.10.testprofile b/qpid/java/test-profiles/java-derby.0.10.testprofile
index 8c53a9423a..ca9115d30d 100644
--- a/qpid/java/test-profiles/java-derby.0.10.testprofile
+++ b/qpid/java/test-profiles/java-derby.0.10.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
broker.language=java
broker.version=0-10
broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
diff --git a/qpid/java/test-profiles/java-derby.testprofile b/qpid/java/test-profiles/java-derby.testprofile
index a88f2d852d..d22e35f07e 100644
--- a/qpid/java/test-profiles/java-derby.testprofile
+++ b/qpid/java/test-profiles/java-derby.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
broker.language=java
broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work
diff --git a/qpid/java/test-profiles/java.0.10.testprofile b/qpid/java/test-profiles/java.0.10.testprofile
index eb615d80d9..a1743eb020 100644
--- a/qpid/java/test-profiles/java.0.10.testprofile
+++ b/qpid/java/test-profiles/java.0.10.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
broker.language=java
broker.version=0-10
broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
diff --git a/qpid/java/test-profiles/java.testprofile b/qpid/java/test-profiles/java.testprofile
index c7d6725d68..c8c776d3e1 100644
--- a/qpid/java/test-profiles/java.testprofile
+++ b/qpid/java/test-profiles/java.testprofile
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
broker.language=java
broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work
diff --git a/qpid/java/test-profiles/test_resources/ssl/app1.crt b/qpid/java/test-profiles/test_resources/ssl/app1.crt
index 52004f4dd1..c04b07fce8 100644
--- a/qpid/java/test-profiles/test_resources/ssl/app1.crt
+++ b/qpid/java/test-profiles/test_resources/ssl/app1.crt
@@ -1,14 +1,15 @@
-----BEGIN CERTIFICATE-----
-MIICGjCCAYOgAwIBAgIFAJFV5bcwDQYJKoZIhvcNAQEFBQAwQTELMAkGA1UEBhMC
+MIICPjCCAaegAwIBAgIFAJJRUVUwDQYJKoZIhvcNAQEFBQAwQTELMAkGA1UEBhMC
Q0ExEDAOBgNVBAgTB09udGFyaW8xDTALBgNVBAoTBEFDTUUxETAPBgNVBAMTCE15
-Um9vdENBMB4XDTEwMDcwNjAyNTQ1OFoXDTEwMTAwNjAyNTQ1OFowYTELMAkGA1UE
-BhMCY2ExCzAJBgNVBAgTAm9uMRAwDgYDVQQHEwd0b3JvbnRvMQ0wCwYDVQQKEwRh
+Um9vdENBMB4XDTEwMTAxNDAyMzM1NloXDTE1MTAxNDAyMzM1NlowYTELMAkGA1UE
+BhMCQ0ExCzAJBgNVBAgTAk9OMRAwDgYDVQQHEwdUb3JvbnRvMQ0wCwYDVQQKEwRh
Y21lMQwwCgYDVQQLEwNhcnQxFjAUBgNVBAMMDWFwcDFAYWNtZS5vcmcwgZ8wDQYJ
-KoZIhvcNAQEBBQADgY0AMIGJAoGBAKHJI2XPwISkvic93ICjk9JdYkGeYR4hNK1N
-JYCgzfNAJDq5xxYPkJWd9kc8+nlT0f//nPUVVPtYzGgw/WV0J01Wp8pOJZRdOzYk
-LjOdaJE8vCoL+EMeYVSEgx2XebEV2l7d7z3gGVFKAmQfEhveNxBWNyJ/o9ELapEF
-y1bsPqhjAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAfhj5E7KYqBLOfbOP1DjM1RQ1
-unG/yEbpN+hk0QNN7FHObSHvRfzSfVrZRxFKvZR8o4yN2RL39jkWsq92GGFSlQzF
-pqGA7YjR1j4UGkY3xib3Vr1PsDZWqmH3CjxXTdo0Y28LtQ/QMt58c0wcwFwMCONJ
-ynb4emD3n6Pw7GjyTYg=
+KoZIhvcNAQEBBQADgY0AMIGJAoGBAIlYzFnmAsv/Ci4rgp3sWwkFGFYEBwiXx0Xz
+auZ10nrOUz6Ce2FGVQBYFA09zi79iUyn86oLuTY0Kc/1emCZEPkmOW+hw1uk/TxG
+5MqpEOZdsDv4xIqBHgtWv/d3kGubwSS5lia1l6EPvnzHvsQSM//xhkrJaF0fAHx5
+FMkilnvfAgMBAAGjIjAgMAkGA1UdEwQCMAAwEwYDVR0lBAwwCgYIKwYBBQUHAwIw
+DQYJKoZIhvcNAQEFBQADgYEAJ47Q/4/hJMwTTpfcojv9KbZUTrve/wkabUrytNf3
+ogqhaIzgUr+vA9EMBc91Jg1WJC/0VMmTrTEggqrgd/prg4xcyATQOwNR1TiaWC4E
+r3pWEpZZnEJSd4vtcciNFNsbuAt2m4Nc90gPNXKgNoe0+3nuxPLs/TIauwOSDF+I
+oiw=
-----END CERTIFICATE-----
diff --git a/qpid/java/test-profiles/test_resources/ssl/app1.req b/qpid/java/test-profiles/test_resources/ssl/app1.req
index f647ffb6e9..b4f1ff9a2c 100644
--- a/qpid/java/test-profiles/test_resources/ssl/app1.req
+++ b/qpid/java/test-profiles/test_resources/ssl/app1.req
@@ -1,10 +1,10 @@
-----BEGIN NEW CERTIFICATE REQUEST-----
-MIIBoTCCAQoCAQAwYTELMAkGA1UEBhMCY2ExCzAJBgNVBAgTAm9uMRAwDgYDVQQHEwd0b3JvbnRv
+MIIBoTCCAQoCAQAwYTELMAkGA1UEBhMCQ0ExCzAJBgNVBAgTAk9OMRAwDgYDVQQHEwdUb3JvbnRv
MQ0wCwYDVQQKEwRhY21lMQwwCgYDVQQLEwNhcnQxFjAUBgNVBAMMDWFwcDFAYWNtZS5vcmcwgZ8w
-DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKHJI2XPwISkvic93ICjk9JdYkGeYR4hNK1NJYCgzfNA
-JDq5xxYPkJWd9kc8+nlT0f//nPUVVPtYzGgw/WV0J01Wp8pOJZRdOzYkLjOdaJE8vCoL+EMeYVSE
-gx2XebEV2l7d7z3gGVFKAmQfEhveNxBWNyJ/o9ELapEFy1bsPqhjAgMBAAGgADANBgkqhkiG9w0B
-AQQFAAOBgQAgmtr+de8dmT1zYKOOlMZNh9w9FJ/qsrk0Fj6yC8f1QKv2ZE8de5p62U7PKzbLzDML
-kmiU9qSHzuucH3Za9zprQ/5t9zIffO2kr+OgPIzgwdNPjVfH5SQrZlZHyVI9lC/0Ou9uJPScj3Qm
-B+lQOmY/tP854g+gqX7drBsP4pQHug==
+DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAIlYzFnmAsv/Ci4rgp3sWwkFGFYEBwiXx0XzauZ10nrO
+Uz6Ce2FGVQBYFA09zi79iUyn86oLuTY0Kc/1emCZEPkmOW+hw1uk/TxG5MqpEOZdsDv4xIqBHgtW
+v/d3kGubwSS5lia1l6EPvnzHvsQSM//xhkrJaF0fAHx5FMkilnvfAgMBAAGgADANBgkqhkiG9w0B
+AQQFAAOBgQADKx89mTCGIbrCE6lICLYDexGxexeaZaUDq7YgtyXVIs2wcVGcZJGolUARopMWgE+y
+ryHTC4nvNCaBULyXGrzwPfzTJaVBiY4V5BoTrmz6Ofd73ZO6ZYNhy9bVLrb5VtDyldCj0EWz2lBe
++OzVUeII5KPopRtzXpMH3sB2OredUg==
-----END NEW CERTIFICATE REQUEST-----
diff --git a/qpid/java/test-profiles/test_resources/ssl/app2.crt b/qpid/java/test-profiles/test_resources/ssl/app2.crt
index 641e2c89e1..5c889a4c31 100644
--- a/qpid/java/test-profiles/test_resources/ssl/app2.crt
+++ b/qpid/java/test-profiles/test_resources/ssl/app2.crt
@@ -1,14 +1,15 @@
-----BEGIN CERTIFICATE-----
-MIICGjCCAYOgAwIBAgIFAJFV5aIwDQYJKoZIhvcNAQEFBQAwQTELMAkGA1UEBhMC
+MIICPjCCAaegAwIBAgIFAJJRUXgwDQYJKoZIhvcNAQEFBQAwQTELMAkGA1UEBhMC
Q0ExEDAOBgNVBAgTB09udGFyaW8xDTALBgNVBAoTBEFDTUUxETAPBgNVBAMTCE15
-Um9vdENBMB4XDTEwMDcwNjAyNTQ0N1oXDTEwMTAwNjAyNTQ0N1owYTELMAkGA1UE
-BhMCY2ExCzAJBgNVBAgTAm9uMRAwDgYDVQQHEwd0b3JvbnRvMQ0wCwYDVQQKEwRh
+Um9vdENBMB4XDTEwMTAxNDAyMzQxNVoXDTE1MTAxNDAyMzQxNVowYTELMAkGA1UE
+BhMCQ0ExCzAJBgNVBAgTAk9OMRAwDgYDVQQHEwdUb3JvbnRvMQ0wCwYDVQQKEwRh
Y21lMQwwCgYDVQQLEwNhcnQxFjAUBgNVBAMMDWFwcDJAYWNtZS5vcmcwgZ8wDQYJ
-KoZIhvcNAQEBBQADgY0AMIGJAoGBAMqo9Z/4mDtK9/NpIMNa7h91aUIYNClV36V7
-iHFzxGnw6ubWb6FB6uEO2KFnjk+Jd0sUEbZI3OCjltbfqGBv8UDZ+3+vMF4HrGJ9
-+5YilFbhvZ8FGWCFjjh9gV0W0ptfsskcw0KCxmeHGHP8RbHoKS4Y79D2bkW1ovO6
-FzFx3uRfAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAXn3f8znVyItIDcC/4zjLczP8
-EbKEpRW656HccDTGygsfK+epyA8CO8RAtddW7epd1z0FCWakd2078pBe225w8/gA
-PQDLlfi1vgAxwhh7xZz1UvtkT9scU/GTdmgg5lZYDBeCDVJ3kuY3t5yg47L3Xuwe
-WutGKNQMrJlUfFUNG70=
+KoZIhvcNAQEBBQADgY0AMIGJAoGBAJcIo3TSYxDa1OfmnDEP4qzLxmgyXC3n0Evu
+2nJz0s5zljjItnwJ9UpOkYh/PQcpUWoM+qKeZYadXbGhp8M8nMrJtUPOAKgDmF6A
+DKS9WL7u8kVCcEvBzLRD7bftEm2IPaRu72wOQai76hj11rYWHHkdAPem+C4ODqVn
+y2NN3zDnAgMBAAGjIjAgMAkGA1UdEwQCMAAwEwYDVR0lBAwwCgYIKwYBBQUHAwIw
+DQYJKoZIhvcNAQEFBQADgYEAc5FG8sDbK+i1703rJEwjJ9dCVXljN2jYL1sGXO2o
+9O5Da0zKcQ+OMhLJUoJf38pJw+maYhtT0fKFLItXP/rlyWlaGRBjkcZjOZ2D/Hg5
+/8pEVwiyTYRoEnGKRawnedIbEyBAcgtnlbkTFWXtQmnmgVApSzTpALRn5/jUC1PU
+Y3g=
-----END CERTIFICATE-----
diff --git a/qpid/java/test-profiles/test_resources/ssl/app2.req b/qpid/java/test-profiles/test_resources/ssl/app2.req
index 52d1a7b8e0..53f3494168 100644
--- a/qpid/java/test-profiles/test_resources/ssl/app2.req
+++ b/qpid/java/test-profiles/test_resources/ssl/app2.req
@@ -1,10 +1,10 @@
-----BEGIN NEW CERTIFICATE REQUEST-----
-MIIBoTCCAQoCAQAwYTELMAkGA1UEBhMCY2ExCzAJBgNVBAgTAm9uMRAwDgYDVQQHEwd0b3JvbnRv
+MIIBoTCCAQoCAQAwYTELMAkGA1UEBhMCQ0ExCzAJBgNVBAgTAk9OMRAwDgYDVQQHEwdUb3JvbnRv
MQ0wCwYDVQQKEwRhY21lMQwwCgYDVQQLEwNhcnQxFjAUBgNVBAMMDWFwcDJAYWNtZS5vcmcwgZ8w
-DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMqo9Z/4mDtK9/NpIMNa7h91aUIYNClV36V7iHFzxGnw
-6ubWb6FB6uEO2KFnjk+Jd0sUEbZI3OCjltbfqGBv8UDZ+3+vMF4HrGJ9+5YilFbhvZ8FGWCFjjh9
-gV0W0ptfsskcw0KCxmeHGHP8RbHoKS4Y79D2bkW1ovO6FzFx3uRfAgMBAAGgADANBgkqhkiG9w0B
-AQQFAAOBgQC3rWDpHak7fbBf+FvdaqxEoIw+g43RsaDqdGX9ZJJ9ybDi50Xy/YzLiP5vRl3XU8mI
-EoqN8ioZl83UXh95Lb6eW/S+ELgiwQh8npblRGpd/IobdKjEAKV1+i3reYqpsYI5L/8JNbcyIT4A
-QOTc9itCc7O+klJzkmLqqpmlHhYX5A==
+DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAJcIo3TSYxDa1OfmnDEP4qzLxmgyXC3n0Evu2nJz0s5z
+ljjItnwJ9UpOkYh/PQcpUWoM+qKeZYadXbGhp8M8nMrJtUPOAKgDmF6ADKS9WL7u8kVCcEvBzLRD
+7bftEm2IPaRu72wOQai76hj11rYWHHkdAPem+C4ODqVny2NN3zDnAgMBAAGgADANBgkqhkiG9w0B
+AQQFAAOBgQAGNtSvXwdyujmMTaVQj2M2jZkgnVFtMBjDDmdz+wgzu8fKaej7e+fJi5owf31wJUUP
+0Zi/6mBNj+blmqHjNQ9U3w9Rns0z3+1DbO3Yj48d75IuxQJJd+lXXjCFi2qSBhaNUwyOpzaI1AQo
+JJTC1/WMaPENU9bgYYsOrmIhnbt5rQ==
-----END NEW CERTIFICATE REQUEST-----
diff --git a/qpid/java/test-profiles/test_resources/ssl/keystore.jks b/qpid/java/test-profiles/test_resources/ssl/keystore.jks
index 5e0c2451e8..e3a850a248 100644
--- a/qpid/java/test-profiles/test_resources/ssl/keystore.jks
+++ b/qpid/java/test-profiles/test_resources/ssl/keystore.jks
Binary files differ
diff --git a/qpid/java/testkit/bin/qpid-python-testkit b/qpid/java/tools/bin/qpid-python-testkit
index 2c1d015281..cbe7972421 100755
--- a/qpid/java/testkit/bin/qpid-python-testkit
+++ b/qpid/java/tools/bin/qpid-python-testkit
@@ -22,9 +22,9 @@
# via the python test runner. The defaults are set for a running
# from an svn checkout
-. ./setenv.sh
+. ./set-testkit-env.sh
-export PYTHONPATH=../:$PYTHONPATH
+export PYTHONPATH=./:$PYTHONPATH
rm -rf $OUTDIR
-$PYTHON_DIR/qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@"
+qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@"
diff --git a/qpid/java/testkit/bin/setenv.sh b/qpid/java/tools/bin/set-testkit-env.sh
index e6a726eef1..051dad8179 100644
--- a/qpid/java/testkit/bin/setenv.sh
+++ b/qpid/java/tools/bin/set-testkit-env.sh
@@ -62,11 +62,11 @@ fi
if [ "$STORE_LIB" = "" ] ; then
if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then
- CLUSTER_LIB="/usr/lib64/qpid/daemon/msgstore.so"
+ STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so"
elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then
- CLUSTER_LIB="/usr/lib/qpid/daemon/msgstore.so"
- else
- echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0;
+ STORE_LIB="/usr/lib/qpid/daemon/msgstore.so"
+ #else
+ # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0;
fi
fi
@@ -82,7 +82,7 @@ if [ "$QP_CP" = "" ] ; then
fi
if [ "$OUTDIR" = "" ] ; then
- OUTDIR=`abs_path "../output"`
+ OUTDIR=`abs_path "./output"`
fi
export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR
diff --git a/qpid/java/tools/bin/testkit.py b/qpid/java/tools/bin/testkit.py
new file mode 100755
index 0000000000..1c2ad598b8
--- /dev/null
+++ b/qpid/java/tools/bin/testkit.py
@@ -0,0 +1,278 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time, string, traceback
+from brokertest import *
+from qpid.messaging import *
+
+
+try:
+ import java.lang.System
+ _cp = java.lang.System.getProperty("java.class.path");
+except ImportError:
+ _cp = checkenv("QP_CP")
+
+class Formatter:
+
+ def __init__(self, message):
+ self.message = message
+ self.environ = {"M": self.message,
+ "P": self.message.properties,
+ "C": self.message.content}
+
+ def __getitem__(self, st):
+ return eval(st, self.environ)
+
+# The base test case has support for launching the generic
+# receiver and sender through the TestLauncher with all the options.
+#
+class JavaClientTest(BrokerTest):
+ """Base Case for Java Test cases"""
+
+ client_class = "org.apache.qpid.testkit.TestLauncher"
+
+ # currently there is no transparent reconnection.
+ # temp hack: just creating the queue here and closing it.
+ def start_error_watcher(self,broker=None):
+ ssn = broker.connect().session()
+ err_watcher = ssn.receiver("control; {create:always}", capacity=1)
+ ssn.close()
+
+ def store_module_args(self):
+ if BrokerTest.store_lib:
+ return ["--load-module", BrokerTest.store_lib]
+ else:
+ print "Store module not present."
+ return [""]
+
+ def client(self,**options):
+ cmd = ["java","-cp",_cp]
+
+ cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
+ cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
+ cmd += ["-Dport=" + str(options.get("port",5672))]
+ cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
+ cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))]
+ cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))]
+ cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))]
+ cmd += ["-Ddurable=" + str(options.get("durable",False))]
+ cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
+ cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
+ cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))]
+ cmd += ["-Dsender=" + str(options.get("sender",False))]
+ cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
+ cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
+ cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))]
+ cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))]
+ cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))]
+ cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))]
+ cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
+ cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
+ cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))]
+ cmd += ["-Dlog.level=" + options.get("log.level", "warn")]
+ cmd += [self.client_class]
+ cmd += [options.get("address", "my_queue; {create: always}")]
+
+ #print str(options.get("port",5672))
+ return cmd
+
+ # currently there is no transparent reconnection.
+ # temp hack: just creating a receiver and closing session soon after.
+ def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60):
+ ssn = broker.connect().session()
+ err_watcher = ssn.receiver("control; {create:always}", capacity=1)
+ i = run_time/error_ck_freq
+ is_error = False
+ for j in range(i):
+ not_empty = True
+ while not_empty:
+ try:
+ m = err_watcher.fetch(timeout=error_ck_freq)
+ ssn.acknowledge()
+ print "Java process notified of an error"
+ self.print_error(m)
+ is_error = True
+ except messaging.Empty, e:
+ not_empty = False
+
+ ssn.close()
+ return is_error
+
+ def print_error(self,msg):
+ print msg.properties.get("exception-trace")
+
+ def verify(self, receiver,sender):
+ sender_running = receiver.is_running()
+ receiver_running = sender.is_running()
+
+ self.assertTrue(receiver_running,"Receiver has exited prematually")
+ self.assertTrue(sender_running,"Sender has exited prematually")
+
+ def start_sender_and_receiver(self,**options):
+
+ receiver_opts = options
+ receiver_opts["receiver"]=True
+ receiver = self.popen(self.client(**receiver_opts),
+ expect=EXPECT_RUNNING)
+
+ sender_opts = options
+ sender_opts["sender"]=True
+ sender = self.popen(self.client(**sender_opts),
+ expect=EXPECT_RUNNING)
+
+ return receiver, sender
+
+ def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
+ if options.get("durable",False)==True:
+ cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
+ else:
+ cluster = Cluster(self, count=count)
+ return cluster
+
+class ConcurrencyTest(JavaClientTest):
+ """A concurrency test suite for the JMS client"""
+ skip = False
+
+ def base_case(self,**options):
+ if self.skip :
+ print "Skipping test"
+ return
+
+ cluster = self.start_cluster(count=2,**options)
+ self.start_error_watcher(broker=cluster[0])
+ options["port"] = port=cluster[0].port()
+
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
+ receiver, sender = self.start_sender_and_receiver(**options)
+ self.monitor_clients(broker=cluster[0],run_time=180)
+ self.verify(receiver,sender)
+
+ def test_multiplexing_con(self):
+ """Tests multiple sessions on a single connection"""
+
+ self.base_case(ssn_per_con=25,test_name=self.id())
+
+ def test_multiplexing_con_with_tx(self):
+ """Tests multiple transacted sessions on a single connection"""
+
+ self.base_case(ssn_per_con=25,transacted=True,test_name=self.id())
+
+ def test_multiplexing_con_with_sync_rcv(self):
+ """Tests multiple sessions with sync receive"""
+
+ self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id())
+
+ def test_multiplexing_con_with_durable_sub(self):
+ """Tests multiple sessions with durable subs"""
+
+ self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
+
+ def test_multiplexing_con_with_sync_ack(self):
+ """Tests multiple sessions with sync ack"""
+
+ self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id())
+
+ def test_multiplexing_con_with_sync_pub(self):
+ """Tests multiple sessions with sync pub"""
+
+ self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id())
+
+ def test_multiple_cons_and_ssns(self):
+ """Tests multiple connections and sessions"""
+
+ self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
+
+
+class SoakTest(JavaClientTest):
+ """A soak test suite for the JMS client"""
+
+ def base_case(self,**options):
+ cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
+ options["port"] = port=cluster[0].port()
+ self.start_error_watcher(broker=cluster[0])
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
+ receiver,sender = self.start_sender_and_receiver(**options)
+ is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
+
+ if (is_error):
+ print "The sender or receiver didn't start properly. Exiting test."
+ return
+ else:
+ "Print no error !"
+
+ # grace period for java clients to get the failover properly setup.
+ time.sleep(30)
+ error_msg= None
+ # Kill original brokers, start new ones.
+ try:
+ for i in range(8):
+ cluster[i].kill()
+ b=cluster.start()
+ self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
+ print "iteration : " + str(i)
+ except ConnectError, e1:
+ error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
+
+ except SessionError, e2:
+ error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
+
+ self.verify(receiver,sender)
+ if error_msg:
+ raise Exception(error_msg)
+
+
+ def test_failover(self) :
+ """Test basic failover"""
+
+ self.base_case(test_name=self.id())
+
+
+ def test_failover_with_durablesub(self):
+ """Test failover with durable subscriber"""
+
+ self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
+
+
+ def test_failover_with_sync_rcv(self):
+ """Test failover with sync receive"""
+
+ self.base_case(sync_rcv=True,test_name=self.id())
+
+
+ def test_failover_with_sync_ack(self):
+ """Test failover with sync ack"""
+
+ self.base_case(sync_ack=True,test_name=self.id())
+
+
+ def test_failover_with_noprefetch(self):
+ """Test failover with no prefetch"""
+
+ self.base_case(max_prefetch=1,test_name=self.id())
+
+
+ def test_failover_with_multiple_cons_and_ssns(self):
+ """Test failover with multiple connections and sessions"""
+
+ self.base_case(use_unique_dests=True,address="amq.topic",
+ con_count=10,ssn_per_con=25,test_name=self.id())
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
index b10129d855..b10129d855 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
index dbc73c404f..dbc73c404f 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
new file mode 100644
index 0000000000..b4294ee4cc
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * A generic receiver which consumes messages
+ * from a given address in a broker (host/port)
+ * until told to stop by killing it.
+ *
+ * It participates in a feedback loop to ensure the producer
+ * doesn't fill up the queue. If it receives an "End" msg
+ * it sends a reply to the replyTo address in that msg.
+ *
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity.
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ *
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err.
+ *
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable
+ * via jvm args.
+ *
+ * sync_rcv - Whether to consume sync (instead of using a listener).
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs. *
+ * check_for_dups - check for duplicate messages and out of order messages.
+ * jms_durable_sub - create a durable subscription instead of a regular subscription.
+ */
+public class Receiver extends Client implements MessageListener
+{
+ long msg_count = 0;
+ int sequence = 0;
+ boolean syncRcv = Boolean.getBoolean("sync_rcv");
+ boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub");
+ boolean checkForDups = Boolean.getBoolean("check_for_dups");
+ MessageConsumer consumer;
+ List<Integer> duplicateMessages = new ArrayList<Integer>();
+
+ public Receiver(Connection con,String addr) throws Exception
+ {
+ super(con);
+ setSsn(con.createSession(isTransacted(), getAck_mode()));
+ consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
+ if (!syncRcv)
+ {
+ consumer.setMessageListener(this);
+ }
+
+ System.out.println("Receiving messages from : " + addr);
+ }
+
+ public void onMessage(Message msg)
+ {
+ handleMessage(msg);
+ }
+
+ public void run() throws Exception
+ {
+ long sleepTime = getReportFrequency();
+ while(true)
+ {
+ if(syncRcv)
+ {
+ long t = sleepTime;
+ while (t > 0)
+ {
+ long start = System.currentTimeMillis();
+ Message msg = consumer.receive(t);
+ t = t - (System.currentTimeMillis() - start);
+ handleMessage(msg);
+ }
+ }
+ Thread.sleep(sleepTime);
+ System.out.println(getDf().format(System.currentTimeMillis())
+ + " - messages received : " + msg_count);
+ }
+ }
+
+ private void handleMessage(Message m)
+ {
+ if (m == null) { return; }
+
+ try
+ {
+ if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
+ {
+ MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo());
+ Message controlMsg = getSsn().createTextMessage();
+ temp.send(controlMsg);
+ if (isTransacted())
+ {
+ getSsn().commit();
+ }
+ temp.close();
+ }
+ else
+ {
+
+ int seq = m.getIntProperty("sequence");
+ if (checkForDups)
+ {
+ if (seq == 0)
+ {
+ sequence = 0; // wrap around for each iteration
+ System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration");
+ duplicateMessages.clear();
+ }
+
+ if (seq < sequence)
+ {
+ duplicateMessages.add(seq);
+ }
+ else if (seq == sequence)
+ {
+ sequence++;
+ msg_count ++;
+ }
+ else
+ {
+ // Multiple publishers are not allowed in this test case.
+ // So out of order messages are not allowed.
+ throw new Exception(": Received an out of order message (expected="
+ + sequence + ",received=" + seq + ")" );
+ }
+ }
+ else
+ {
+ msg_count ++;
+ }
+
+ // Please note that this test case doesn't expect duplicates
+ // When testing for transactions.
+ if (isTransacted() && msg_count % getTxSize() == 0)
+ {
+ getSsn().commit();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ handleError("Exception receiving messages",e);
+ }
+ }
+
+ // Receiver host port address
+ public static void main(String[] args) throws Exception
+ {
+ String host = "127.0.0.1";
+ int port = 5672;
+ String addr = "message_queue";
+
+ if (args.length > 0)
+ {
+ host = args[0];
+ }
+ if (args.length > 1)
+ {
+ port = Integer.parseInt(args[1]);
+ }
+ if (args.length > 2)
+ {
+ addr = args[2];
+ }
+
+ AMQConnection con = new AMQConnection(
+ "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "'");
+
+ Receiver rcv = new Receiver(con,addr);
+ rcv.run();
+ }
+
+}
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
index 14b9b7302f..14b9b7302f 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
new file mode 100644
index 0000000000..36ae7cad42
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
@@ -0,0 +1,370 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
+
+/**
+ * A basic test case class that could launch a Sender/Receiver
+ * or both, each on it's own separate thread.
+ *
+ * If con_count == ssn_count, then each entity created will have
+ * it's own Connection. Else if con_count < ssn_count, then
+ * a connection will be shared by ssn_count/con_count # of entities.
+ *
+ * The if both sender and receiver options are set, it will
+ * share a connection.
+ *
+ * The following options are available as jvm args
+ * host, port
+ * con_count,ssn_count
+ * con_idle_time - which determines heartbeat
+ * sender, receiver - booleans which indicate which entity to create.
+ * Setting them both is also a valid option.
+ */
+public class TestLauncher implements ErrorHandler
+{
+ protected String host = "127.0.0.1";
+ protected int port = 5672;
+ protected int sessions_per_con = 1;
+ protected int connection_count = 1;
+ protected long heartbeat = 5000;
+ protected boolean sender = false;
+ protected boolean receiver = false;
+ protected boolean useUniqueDests = false;
+ protected String url;
+
+ protected String address = "my_queue; {create: always}";
+ protected boolean durable = false;
+ protected String failover = "";
+ protected AMQConnection controlCon;
+ protected Destination controlDest = null;
+ protected Session controlSession = null;
+ protected MessageProducer statusSender;
+ protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+ protected String testName;
+
+ public TestLauncher()
+ {
+ testName = System.getProperty("test_name","UNKNOWN");
+ host = System.getProperty("host", "127.0.0.1");
+ port = Integer.getInteger("port", 5672);
+ sessions_per_con = Integer.getInteger("ssn_per_con", 1);
+ connection_count = Integer.getInteger("con_count", 1);
+ heartbeat = Long.getLong("heartbeat", 5);
+ sender = Boolean.getBoolean("sender");
+ receiver = Boolean.getBoolean("receiver");
+ useUniqueDests = Boolean.getBoolean("use_unique_dests");
+
+ failover = System.getProperty("failover", "");
+ durable = Boolean.getBoolean("durable");
+
+ url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "?heartbeat='" + heartbeat+ "''";
+
+ if (failover.equalsIgnoreCase("failover_exchange"))
+ {
+ url += "&failover='failover_exchange'";
+
+ System.out.println("Failover exchange " + url );
+ }
+
+ configureLogging();
+ }
+
+ protected void configureLogging()
+ {
+ PatternLayout layout = new PatternLayout();
+ layout.setConversionPattern("%t %d %p [%c{4}] %m%n");
+ BasicConfigurator.configure(new ConsoleAppender(layout));
+
+ String logLevel = System.getProperty("log.level","warn");
+ String logComponent = System.getProperty("log.comp","org.apache.qpid");
+
+ Logger logger = Logger.getLogger(logComponent);
+ logger.setLevel(Level.toLevel(logLevel, Level.WARN));
+
+ System.out.println("Level " + logger.getLevel());
+
+ }
+
+ public void setUpControlChannel()
+ {
+ try
+ {
+ controlCon = new AMQConnection(url);
+ controlCon.start();
+
+ controlDest = new AMQAnyDestination("control; {create: always}"); // durable
+
+ // Create the session to setup the messages
+ controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ statusSender = controlSession.createProducer(controlDest);
+
+ }
+ catch (Exception e)
+ {
+ handleError("Error while setting up the test",e);
+ }
+ }
+
+ public void cleanup()
+ {
+ try
+ {
+ controlSession.close();
+ controlCon.close();
+ for (AMQConnection con : clients)
+ {
+ con.close();
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Error while tearing down the test",e);
+ }
+ }
+
+ public void start(String addr)
+ {
+ try
+ {
+ if (addr == null)
+ {
+ addr = address;
+ }
+
+ int ssn_per_con = sessions_per_con;
+ String addrTemp = addr;
+ for (int i = 0; i< connection_count; i++)
+ {
+ AMQConnection con = new AMQConnection(url);
+ con.start();
+ clients.add(con);
+ for (int j = 0; j< ssn_per_con; j++)
+ {
+ String index = createPrefix(i,j);
+ if (useUniqueDests)
+ {
+ addrTemp = modifySubject(index,addr);
+ }
+
+ if (sender)
+ {
+ createSender(index,con,addrTemp,this);
+ }
+
+ if (receiver)
+ {
+ System.out.println("########## Creating receiver ##################");
+
+ createReceiver(index,con,addrTemp,this);
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Exception while setting up the test",e);
+ }
+
+ }
+
+ protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Receiver rcv = new Receiver(con,addr);
+ rcv.setErrorHandler(h);
+ rcv.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Receiver", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().newThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Receive thread",e);
+ }
+
+ t.setName("ReceiverThread-" + index);
+ t.start();
+ }
+
+ protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Sender sender = new Sender(con, addr);
+ sender.setErrorHandler(h);
+ sender.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Sender", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().newThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Sender thread",e);
+ }
+
+ t.setName("SenderThread-" + index);
+ t.start();
+ }
+
+ public synchronized void handleError(String msg,Exception e)
+ {
+ // In case sending the message fails
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" @ ");
+ sb.append(df.format(new Date(System.currentTimeMillis())));
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+
+ try
+ {
+ TextMessage errorMsg = controlSession.createTextMessage();
+ errorMsg.setStringProperty("status", "error");
+ errorMsg.setStringProperty("desc", msg);
+ errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
+ errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
+
+ System.out.println("Msg " + errorMsg);
+
+ statusSender.send(errorMsg);
+ }
+ catch (JMSException e1)
+ {
+ e1.printStackTrace();
+ }
+ }
+
+ private String serializeStackTrace(Exception e)
+ {
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+ PrintStream printStream = new PrintStream(bOut);
+ e.printStackTrace(printStream);
+ printStream.close();
+ return bOut.toString();
+ }
+
+ private String createPrefix(int i, int j)
+ {
+ return String.valueOf(i).concat(String.valueOf(j));
+ }
+
+ /**
+ * A basic helper function to modify the subjects by
+ * appending an index.
+ */
+ private String modifySubject(String index,String addr)
+ {
+ if (addr.indexOf("/") > 0)
+ {
+ addr = addr.substring(0,addr.indexOf("/")+1) +
+ index +
+ addr.substring(addr.indexOf("/")+1,addr.length());
+ }
+ else if (addr.indexOf(";") > 0)
+ {
+ addr = addr.substring(0,addr.indexOf(";")) +
+ "/" + index +
+ addr.substring(addr.indexOf(";"),addr.length());
+ }
+ else
+ {
+ addr = addr + "/" + index;
+ }
+
+ return addr;
+ }
+
+ public static void main(String[] args)
+ {
+ final TestLauncher test = new TestLauncher();
+ test.setUpControlChannel();
+ System.out.println("args.length " + args.length);
+ System.out.println("args [0] " + args [0]);
+ test.start(args.length > 0 ? args [0] : null);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() { test.cleanup(); }
+ });
+
+ }
+}