diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-06 16:05:46 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-06 16:05:46 +0000 |
commit | 80ba5c5efdc23e3922b8f8f5152ceeaefa6951b6 (patch) | |
tree | 2eb2141eb77d43701e718b5b5ab1cbd07401015f /qpid/java | |
parent | 1d44d6e7a3369fb7773ba50d02c3baa8955da382 (diff) | |
download | qpid-python-grkvlt-network-20101013.tar.gz |
Attempt one at merge from r1021441:HEADgrkvlt-network-20101013
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1042697 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
123 files changed, 2453 insertions, 792 deletions
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/AbstractConfiguration.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/AbstractConfiguration.java index a684e52ce4..e8e630842c 100644 --- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/AbstractConfiguration.java +++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/AbstractConfiguration.java @@ -61,7 +61,7 @@ public abstract class AbstractConfiguration implements ConfigurationFile public RuleSet reload() { RuleSet oldRules = _config; - + try { RuleSet newRules = load(); diff --git a/qpid/java/broker-plugins/experimental/info/build.properties b/qpid/java/broker-plugins/experimental/info/build.properties index ca85cb7b66..bdbbe1c2af 100644 --- a/qpid/java/broker-plugins/experimental/info/build.properties +++ b/qpid/java/broker-plugins/experimental/info/build.properties @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# source.. = src/ output.. = bin/ bin.includes = META-INF/,\ diff --git a/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF b/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF new file mode 100644 index 0000000000..49e90c6aad --- /dev/null +++ b/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF @@ -0,0 +1,15 @@ +Manifest-Version: 1.0 +Bundle-ManifestVersion: 2 +Bundle-Name: Experimental Shutdown +Bundle-Description: Experimental Qpid Broker Shutdown Plugin +Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt +Bundle-DocURL: http://qpid.apache.org/ +Bundle-SymbolicName: broker-plugins-experimental-shutdown;singleton:=true +Bundle-Version: 1.0.0 +Bundle-Activator: org.apache.qpid.shutdown.Activator +Import-Package: javax.management;resolution:=optional, + org.apache.log4j, + org.osgi.framework +Bundle-RequiredExecutionEnvironment: J2SE-1.5 +Bundle-ActivationPolicy: lazy + diff --git a/qpid/java/broker-plugins/experimental/shutdown/build.xml b/qpid/java/broker-plugins/experimental/shutdown/build.xml new file mode 100644 index 0000000000..ec4fce374e --- /dev/null +++ b/qpid/java/broker-plugins/experimental/shutdown/build.xml @@ -0,0 +1,32 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<project name="AMQ Broker Shutdown Plugin" default="build"> + + <property name="module.depends" value="common broker broker-plugins"/> + <property name="module.test.depends" value="test broker/test management/common client systests"/> + <property name="module.manifest" value="MANIFEST.MF"/> + <property name="module.plugin" value="true"/> + + <import file="../../../module.xml"/> + + <target name="bundle" depends="bundle-tasks"/> + +</project> diff --git a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java new file mode 100644 index 0000000000..ad5e7707b6 --- /dev/null +++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.shutdown; + +import java.lang.management.ManagementFactory; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.log4j.Logger; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; + +public class Activator implements BundleActivator +{ + private static final Logger _logger = Logger.getLogger(Activator.class); + + private static final String SHUTDOWN_MBEAN_NAME = "org.apache.qpid:type=ShutdownMBean"; + + /** @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) */ + public void start(BundleContext ctx) throws Exception { + Shutdown shutdown = new Shutdown(); + if (ctx != null) + { + ctx.registerService(ShutdownMBean.class.getName(), shutdown, null); + } + + // MBean registration + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName name = new ObjectName(SHUTDOWN_MBEAN_NAME); + mbs.registerMBean(shutdown, name); + + _logger.info("Shutdown plugin MBean registered"); + } + + /** @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) */ + public void stop(BundleContext ctx) throws Exception + { + // Unregister MBean + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName name = new ObjectName(SHUTDOWN_MBEAN_NAME); + try + { + mbs.unregisterMBean(name); + } + catch (InstanceNotFoundException e) + { + //ignore + } + + _logger.info("Shutdown plugin MBean unregistered"); + } +} diff --git a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java new file mode 100644 index 0000000000..9a6f85fe9c --- /dev/null +++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.shutdown; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; + +/** + * Implementation of the JMX broker shutdown plugin. + */ +public class Shutdown implements ShutdownMBean +{ + private static final Logger _logger = Logger.getLogger(Shutdown.class); + + private static final String FORMAT = "yyyyy/MM/dd hh:mm:ss"; + private static final int THREAD_COUNT = 1; + private static final ScheduledExecutorService EXECUTOR = new ScheduledThreadPoolExecutor(THREAD_COUNT); + + private final Runnable _shutdown = new SystemExiter(); + + /** @see ShutdownMBean#shutdown() */ + public void shutdown() + { + _logger.info("Shutting down at user's request"); + shutdownBroker(0); + } + + /** @see ShutdownMBean#shutdown(long) */ + public void shutdown(long delay) + { + _logger.info("Scheduled broker shutdown after " + delay + "ms"); + shutdownBroker(delay); + } + + /** @see ShutdownMBean#shutdownAt(String) */ + public void shutdownAt(String when) + { + Date date; + DateFormat df = new SimpleDateFormat(FORMAT); + try + { + date = df.parse(when); + } + catch (ParseException e) + { + _logger.error("Invalid date \"" + when + "\": expecting " + FORMAT, e); + return; + } + _logger.info("Scheduled broker shutdown at " + when); + long now = System.currentTimeMillis(); + long time = date.getTime(); + if (time > now) + { + shutdownBroker(time - now); + } + else + { + shutdownBroker(0); + } + } + + /** + * Submits the {@link SystemExiter} job to shutdown the broker. + */ + private void shutdownBroker(long delay) + { + EXECUTOR.schedule(_shutdown, delay, TimeUnit.MILLISECONDS); + } + + /** + * Shutting down the system in another thread to avoid JMX exceptions being thrown. + */ + class SystemExiter implements Runnable + { + public void run() + { + System.exit(0); + } + } +} diff --git a/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java new file mode 100644 index 0000000000..6294f869e9 --- /dev/null +++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.shutdown; + +/** + * Shutdown plugin JMX MBean interface. + * + * Shuts the Qpid broker down via JMX. + */ +public interface ShutdownMBean +{ + /** + * Broker will be shut down immediately. + */ + public void shutdown(); + + /** + * Broker will be shutdown after the specified delay + * + * @param delay the number of ms to wait + */ + public void shutdown(long delay); + + /** + * Broker will be shutdown at the specified date and time. + * + * @param when the date and time to shutdown + */ + public void shutdownAt(String when); +} diff --git a/qpid/java/broker/bin/qpid.start b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd index e44b6083ea..6e005f5bdb 100755 --- a/qpid/java/broker/bin/qpid.start +++ b/qpid/java/broker-plugins/experimental/shutdown/src/main/java/shutdown.bnd @@ -1,4 +1,3 @@ -#!/bin/bash # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -18,4 +17,9 @@ # under the License. # -exec qpid-server -run:debug "$@"
\ No newline at end of file +ver: 0.9.0 + +Bundle-SymbolicName: qpid-shutdown-plugin +Bundle-Version: ${ver} +Export-Package: *;version=${ver} +Bundle-RequiredExecutionEnvironment: J2SE-1.5 diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml index f22972384d..edd71effaa 100644 --- a/qpid/java/broker/build.xml +++ b/qpid/java/broker/build.xml @@ -72,6 +72,12 @@ <fixcrlf srcdir="${module.release}/bin" fixlast="true" eol="dos" includes="*.bat"/> </target> + <target name="release-bin-other" description="copy broker-plugins into module release"> + <copy todir="${module.release}/lib/plugins" failonerror="true"> + <fileset dir="${build.lib}/plugins"/> + </copy> + </target> + <target name="release-bin" depends="release-bin-tasks"/> </project> diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml index 87101678c0..d9677c9cf6 100644 --- a/qpid/java/broker/etc/config.xml +++ b/qpid/java/broker/etc/config.xml @@ -53,9 +53,10 @@ <enabled>true</enabled> <jmxport>8999</jmxport> <ssl> - <enabled>true</enabled> - <!-- Update below path to your keystore location, eg ${conf}/qpid.keystore --> - <keyStorePath>${prefix}/../test-profiles/test_resources/ssl/keystore.jks</keyStorePath> + <enabled>false</enabled> + <!-- Update below path to your keystore location, or run the bin/create-example-ssl-stores(.sh|.bat) + script from within the etc/ folder to generate an example store with self-signed cert --> + <keyStorePath>${conf}/qpid.keystore</keyStorePath> <keyStorePassword>password</keyStorePassword> </ssl> </management> diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml index 381173da15..967b0933b4 100644 --- a/qpid/java/broker/etc/log4j.xml +++ b/qpid/java/broker/etc/log4j.xml @@ -68,7 +68,7 @@ <param name="backupFilesToPath" value="${QPID_WORK}/backup/log"/> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> + <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/> </layout> </appender> @@ -77,13 +77,13 @@ <param name="Append" value="false"/> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> + <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/> </layout> </appender> <appender class="org.apache.log4j.ConsoleAppender" name="STDOUT"> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> + <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/> </layout> </appender> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java index ad4e40a562..8150cd7404 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java @@ -255,6 +255,7 @@ public class BrokerInstance private void configureLogging(File logConfigFile, int logWatchTime) throws Exception { + _logger.info("configuring logging using file " + logConfigFile.getName()); if (logConfigFile.exists() && logConfigFile.canRead()) { CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java index 0e03e33be8..7dfe9ff49a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java @@ -29,6 +29,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import sun.misc.Unsafe; + public class ConfigStore { private ConcurrentHashMap<ConfigObjectType, ConcurrentHashMap<UUID, ConfiguredObject>> _typeMap = diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java index c4cad1e5c9..18f41588d5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java @@ -229,13 +229,12 @@ public abstract class ConfigurationPlugin return getListValue(property, Collections.<String>emptyList()); } - @SuppressWarnings("unchecked") protected List<String> getListValue(String property, List<String> defaultValue) { - return (List<String>) _configuration.getList(property, defaultValue); + return _configuration.getList(property, defaultValue); } - /// Validation Helpers + // Validation Helpers protected boolean contains(String property) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java index c5fbb6efd9..7a2632d923 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java @@ -85,8 +85,5 @@ public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin } } - System.out.println("Configured SCDC"); - System.out.println("Delay:" + getDelay()); - System.out.println("TimeUnit:" + getTimeUnit()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index c06305ee4e..caec2c1324 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -57,20 +57,21 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable { connection.close(cause, message); } - catch (AMQException e) + catch (Exception e) { - _logger.warn("Error closing connection:" + e.getMessage()); + _logger.warn("Error closing connection: " + e.getMessage()); + deregisterConnection(connection); } } - public void registerConnection(AMQConnectionModel connnection) + public void registerConnection(AMQConnectionModel connection) { - _registry.add(connnection); + _registry.add(connection); } - public void deregisterConnection(AMQConnectionModel connnection) + public void deregisterConnection(AMQConnectionModel connection) { - _registry.remove(connnection); + _registry.remove(connection); } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java index c4ffcd26bf..8bce180784 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java @@ -20,9 +20,13 @@ */ package org.apache.qpid.server.management; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.thread.Threading; import javax.management.ListenerNotFoundException; import javax.management.MBeanInfo; @@ -45,12 +49,13 @@ public abstract class AMQManagedObject extends DefaultManagedObject /** * broadcaster support class */ - protected NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport(); + protected NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport( + Executors.newCachedThreadPool(Threading.getThreadFactory())); /** * sequence number for notifications */ - protected long _notificationSequenceNumber = 0; + protected AtomicLong _notificationSequenceNumber = new AtomicLong(0); protected MBeanInfo _mbeanInfo; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java index 055403ff08..399f8f9327 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.server.message; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class MessageReference<M extends ServerMessage> { - private static final AtomicReferenceFieldUpdater<MessageReference, ServerMessage> _messageUpdater = - AtomicReferenceFieldUpdater.newUpdater(MessageReference.class, ServerMessage.class,"_message"); + private final AtomicBoolean _released = new AtomicBoolean(false); private volatile M _message; @@ -47,10 +46,12 @@ public abstract class MessageReference<M extends ServerMessage> public void release() { - M message = (M) _messageUpdater.getAndSet(this,null); - if(message != null) + if(!_released.getAndSet(true)) { - onRelease(message); + if(_message != null) + { + onRelease(_message); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index b61da12b05..a6bab017a1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -63,6 +63,7 @@ public class PluginManager implements Closeable private static final Logger _logger = Logger.getLogger(PluginManager.class); private static final int FELIX_STOP_TIMEOUT = 30000; + private static final String QPID_VER_SUFFIX = "version=0.9,"; private Framework _felix; @@ -133,33 +134,33 @@ public class PluginManager implements Closeable "org.osgi.service.startlevel; version=1.0.0," + "org.osgi.service.url; version=1.0.0," + "org.osgi.util.tracker; version=1.0.0," + - "org.apache.qpid.junit.extensions.util; version=0.7," + - "org.apache.qpid; version=0.7," + - "org.apache.qpid.common; version=0.7," + - "org.apache.qpid.exchange; version=0.7," + - "org.apache.qpid.framing; version=0.7," + - "org.apache.qpid.management.common.mbeans.annotations; version=0.7," + - "org.apache.qpid.protocol; version=0.7," + - "org.apache.qpid.server.binding; version=0.7," + - "org.apache.qpid.server.configuration; version=0.7," + - "org.apache.qpid.server.configuration.plugins; version=0.7," + - "org.apache.qpid.server.configuration.management; version=0.7," + - "org.apache.qpid.server.exchange; version=0.7," + - "org.apache.qpid.server.logging; version=0.7," + - "org.apache.qpid.server.logging.actors; version=0.7," + - "org.apache.qpid.server.logging.subjects; version=0.7," + - "org.apache.qpid.server.management; version=0.7," + - "org.apache.qpid.server.persistent; version=0.7," + - "org.apache.qpid.server.plugins; version=0.7," + - "org.apache.qpid.server.protocol; version=0.7," + - "org.apache.qpid.server.queue; version=0.7," + - "org.apache.qpid.server.registry; version=0.7," + - "org.apache.qpid.server.security; version=0.7," + - "org.apache.qpid.server.security.access; version=0.7," + - "org.apache.qpid.server.security.access.plugins; version=0.7," + - "org.apache.qpid.server.virtualhost; version=0.7," + - "org.apache.qpid.server.virtualhost.plugins; version=0.7," + - "org.apache.qpid.util; version=0.7," + + "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX + + "org.apache.qpid; " + QPID_VER_SUFFIX + + "org.apache.qpid.common; " + QPID_VER_SUFFIX + + "org.apache.qpid.exchange; " + QPID_VER_SUFFIX + + "org.apache.qpid.framing; " + QPID_VER_SUFFIX + + "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX + + "org.apache.qpid.protocol; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.management; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.security; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.util; " + QPID_VER_SUFFIX + "org.apache.commons.configuration; version=1.0.0," + "org.apache.commons.lang; version=1.0.0," + "org.apache.commons.lang.builder; version=1.0.0," + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 1185557d8f..c339bd9f90 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -361,7 +361,6 @@ public class AMQProtocolEngine implements Receiver<java.nio.ByteBuffer>, Managab mechanisms.getBytes(), locales.getBytes()); _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); - } catch (AMQException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 77101e7d58..b009b6f522 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -135,7 +135,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public String getVersion() { - return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString(); + return _protocolSession.getClientVersion(); } public Date getLastIoTime() @@ -324,7 +324,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public void notifyClients(String notificationMsg) { Notification n = - new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, + new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, _notificationSequenceNumber.getAndIncrement(), System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(n); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java index 2fdf27d1aa..3a5bc7de48 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java @@ -54,7 +54,7 @@ public class BrokerReceiver implements Receiver<java.nio.ByteBuffer>, LogSubject private IApplicationRegistry _appRegistry; private volatile Receiver<java.nio.ByteBuffer> _delegate = new SelfDelegateProtocolEngine(); - + public BrokerReceiver(IApplicationRegistry appRegistry, String fqdn, Set<VERSION> supported, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 92b0236b6c..3befd43d89 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -38,7 +38,7 @@ import org.apache.qpid.transport.network.NetworkConnection; public class ProtocolEngine_0_10 extends InputHandler implements ConnectionConfig { - public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + public static final int MAX_FRAME_SIZE = Integer.getInteger("qpid.maxFrameSize", 64 * 1024 - 1); private NetworkConnection _network; private ServerConnection _connection; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index b5294b6d2f..784582b83e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -57,7 +57,44 @@ import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Set; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; +import javax.management.OperationsException; +import javax.management.monitor.MonitorNotification; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.MessageProperties; /** * AMQQueueMBean is the management bean for an {@link AMQQueue}. @@ -298,7 +335,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que */ public void checkForNotification(ServerMessage msg) throws AMQException { - final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); if(!notificationChecks.isEmpty()) @@ -317,7 +353,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que } } } - } /** @@ -330,7 +365,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que notificationMsg = notification.name() + " " + notificationMsg; _lastNotification = - new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, + new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, _notificationSequenceNumber.getAndIncrement(), System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(_lastNotification); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index f1407b8770..580fe8e834 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -55,6 +55,7 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkTransport; /** @@ -71,7 +72,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry protected final ServerConfiguration _configuration; - protected final Map<Integer, NetworkTransport> _transports = new HashMap<Integer, NetworkTransport>(); + protected final Map<Integer, IncomingNetworkTransport> _transports = new HashMap<Integer, IncomingNetworkTransport>(); protected ManagedObjectRegistry _managedObjectRegistry; @@ -374,12 +375,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { transport.close(); + CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(transport.getAddress().toString(), port)); } catch (Throwable e) { _logger.error("Unable to close network driver due to:" + e.getMessage()); } - CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(transport.getAddress().toString(), port)); } } } @@ -389,7 +390,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return _configuration; } - public void registerTransport(int port, NetworkTransport transport) + public void registerTransport(int port, IncomingNetworkTransport transport) { synchronized (_transports) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index 9d138055bf..3357a42e68 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkTransport; public interface IApplicationRegistry @@ -81,7 +82,7 @@ public interface IApplicationRegistry /** * Register any network transports for this registry */ - void registerTransport(int port, NetworkTransport transport); + void registerTransport(int port, IncomingNetworkTransport transport); public UUID getBrokerId(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 0865165925..2e694b24ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -134,6 +134,8 @@ public class DerbyMessageStore implements MessageStore private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME; + private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + private LogSubject _logSubject; private boolean _configured; @@ -631,9 +633,9 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - if (e.getSQLState().equalsIgnoreCase("XJ015")) + if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE)) { - //XJ015 is expected and represents a clean shutdown, do nothing. + //expected and represents a clean shutdown of this database only, do nothing. } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 9952700ae1..511d8e7fed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -906,10 +906,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public String toLogString() { String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), - _queue.getNameShortString()); - String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "(" - // queueString is "vh(/{0})/qu({1}) " so need to trim - + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] "; + _queue.getNameShortString()); + String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + + "(" + queueInfo.trim() + ")" + "] "; return result; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index d6abee45d8..2439e607b1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -91,6 +91,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, if (state == State.CLOSED) { CurrentActor.get().message(this, ConnectionMessages.CLOSE()); + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java index 2db1944cd1..9ba9e2f4a4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java @@ -45,13 +45,13 @@ public abstract class HouseKeepingTask implements Runnable { // Don't need to undo this as this is a thread pool thread so will // always go through here before we do any real work. - Thread.currentThread().setName(_name); + //Thread.currentThread().setName(_name); // XXX temporary CurrentActor.set(new AbstractActor(_rootLogger) { @Override public String getLogMessage() { - return _name; + return _name + " "; } }); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index c54173a281..1038e8fbd0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; @@ -215,6 +216,25 @@ public class VirtualHostImpl implements VirtualHost _connectionRegistry = new ConnectionRegistry(); _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount()); + _houseKeepingTasks.setThreadFactory(new ThreadFactory() + { + public Thread newThread(Runnable r) + { + Thread t = new Thread(r); + String name = "HouseKeeping"; + StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + for (StackTraceElement elt : trace) + { + if (elt.getClassName().endsWith("Test")) + { + name += "-" + elt.getClassName(); +// break; // FIXME + } + } + t.setName(name); + return t; + } + }); _queueRegistry = new DefaultQueueRegistry(this); @@ -248,6 +268,7 @@ public class VirtualHostImpl implements VirtualHost _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); + initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); } @@ -275,12 +296,22 @@ public class VirtualHostImpl implements VirtualHost } catch (Exception e) { - _logger.error("Exception in housekeeping for queue: " - + q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. + _logger.error("Exception in housekeeping for queue: " + q.getName(), e); + // Don't throw exceptions as this will stop the task from running. } } + } + } + + class CheckTransactionsTask extends HouseKeepingTask + { + public CheckTransactionsTask(VirtualHost vhost) + { + super(vhost); + } + + public void execute() + { for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) { _logger.debug("Checking for long running open transactions on connection " + connection); @@ -293,17 +324,19 @@ public class VirtualHostImpl implements VirtualHost _configuration.getTransactionTimeoutOpenClose(), _configuration.getTransactionTimeoutIdleWarn(), _configuration.getTransactionTimeoutIdleClose()); - } + } catch (Exception e) { _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + // Don't throw exceptions as this will stop the task from running. } } } } - } + }; scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); + scheduleHouseKeepingTask(period, new CheckTransactionsTask(this)); Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); @@ -368,13 +401,11 @@ public class VirtualHostImpl implements VirtualHost _houseKeepingTasks.setCorePoolSize(newSize); } - public int getHouseKeepingActiveCount() { return _houseKeepingTasks.getActiveCount(); } - private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception { String messageStoreClass = hostConfig.getMessageStoreClass(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java index 12206013eb..3346f80b7c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java @@ -83,9 +83,11 @@ public class ConfiguredQueueBindingListener implements BindingListener if (config != null) { _cache.add(queue); + _log.error("=== SCD === ADD " + queue.getName()); } else { + _log.error("=== SCD === REMOVE " + queue.getName()); _cache.remove(queue); } } diff --git a/qpid/java/build.deps b/qpid/java/build.deps index 815ff35058..b56c79f588 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -1,3 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +backport-util-concurrent=lib/backport-util-concurrent-2.2.jar + commons-beanutils-core=lib/commons-beanutils-core-1.8.0.jar commons-cli=lib/commons-cli-1.0.jar commons-codec=lib/commons-codec-1.3.jar @@ -22,8 +43,8 @@ log4j=lib/log4j-1.2.12.jar mina-core=lib/mina-core-1.1.7.jar mina-filter-ssl=lib/mina-filter-ssl-1.1.7.jar -slf4j-api=lib/slf4j-api-1.4.0.jar -slf4j-log4j=lib/slf4j-log4j12-1.4.0.jar +slf4j-api=lib/slf4j-api-1.6.1.jar +slf4j-log4j=lib/slf4j-log4j12-1.6.1.jar xalan=lib/xalan-2.7.0.jar @@ -75,9 +96,15 @@ felix.libs=${felix-framework} commons-configuration.libs = ${commons-beanutils-core} ${commons-digester} \ ${commons-codec} ${commons-lang} ${commons-collections} ${commons-configuration} +<<<<<<< .working common.libs=${slf4j-api} ${mina-core} ${mina-filter-ssl} client.libs=${geronimo-jms} ${common.libs} tools.libs=${commons-configuration.libs} ${broker.libs} +======= +common.libs=${slf4j-api} ${backport-util-concurrent} ${mina-core} ${mina-filter-ssl} +client.libs=${geronimo-jms} +tools.libs=${commons-configuration.libs} ${log4j} +>>>>>>> .merge-right.r1042616 broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \ ${xalan} ${felix.libs} ${derby-db} ${commons-configuration.libs} diff --git a/qpid/java/client/README.txt b/qpid/java/client/README.txt new file mode 100644 index 0000000000..57a98cc978 --- /dev/null +++ b/qpid/java/client/README.txt @@ -0,0 +1,51 @@ +Documentation +============= + +You can access documentation for the client via our website at: +http://qpid.apache.org/documentation + +and via our wiki at: +http://cwiki.apache.org/confluence/display/qpid/Qpid+Java+Documentation + +The client uses the Java Message Service (JMS) 1.1 API, information on which is +widely available using your favoured search engine. + + +Running the Examples: +===================== + +1. From the client Binary distribution: + +From the <installation path>/qpid-client-<version> directory, there are examples +provided in source form in the example/src sub-directory. These are also +provided in binary form in the example/lib directory in order that they can be +run more easily. + +E.g, in order to run the Hello example, you would add the client+example library +files to the java classpath and launch the example like follows: + +java -cp "lib/qpid-all.jar:example/lib/qpid-client-examples-<version>.jar" \ + org.apache.qpid.example.Hello + +NOTE: The client uses the SL4FJ API for its logging. You must supply a logging +implementation of your choice (eg Log4J) and its associated SLF4J binding, by +also adding them to the Java classpath as well as the client libraries +themselves. Failure to do so will result in a warning being output and use of +NoOp logging by the client. + +More information on using SLF4J is available at http://www.slf4j.org/manual.html +which details some of the supported logging implementations and their +associated SLF4 bindings as available in the SLF4J distribution. + + + +2. From the Source distribution / repository: + +Run 'ant build' in the parent directory from where this file is stored, ie: +<installation path>/qpid/java + +This will build the various Java modules, leaving binary .jar files output in: +<installation path>/qpid/java/build/lib + +Taking the above the 'distribution directory', consult the README.txt file at: +<installation path>/qpid/java/client/example/src/main/java diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml index 3c6132dc5b..d52de8dca6 100644 --- a/qpid/java/client/build.xml +++ b/qpid/java/client/build.xml @@ -27,6 +27,9 @@ <import file="../module.xml"/> + <property name="example.src.dir" value="${project.root}/client/example/src/main/java" /> + <property name="example.jar.file" value="${build.lib}/qpid-client-example-${project.version}.jar" /> + <property name="output.dir" value="${module.precompiled}/org/apache/qpid/filter/selector"/> <target name="precompile"> @@ -46,6 +49,23 @@ classpathref="module.class.path" packagenames="org.apache.qpid.jms"/> </target> + <target name="release-bin-copy-examples"> + <copy todir="${module.release}/example/src" failonerror="true"> + <fileset dir="${example.src.dir}" excludes="runSample.sh README.txt" /> + </copy> + <copy todir="${module.release}/example/lib" failonerror="true"> + <fileset file="${example.jar.file}"/> + </copy> + </target> + + <target name="release-bin-copy-readme"> + <copy todir="${module.release}" overwrite="true" failonerror="true"> + <fileset file="${basedir}/README.txt" /> + </copy> + </target> + + <target name="release-bin-other" depends="release-bin-copy-examples,release-bin-copy-readme"/> + <target name="release-bin" depends="release-bin-tasks"/> <target name="bundle" depends="bundle-tasks"/> diff --git a/qpid/java/client/example/bin/README.txt b/qpid/java/client/example/bin/README.txt deleted file mode 100644 index 9a1ce91d41..0000000000 --- a/qpid/java/client/example/bin/README.txt +++ /dev/null @@ -1,11 +0,0 @@ -= Qpid Java Examples = - -For more information read ../README.txt. - -== The Verify All Script == - -The verify_all script will run Java examples against itself and against the C++ -and Python examples. The success of the script is determined by comparing its -output against what is expected. - -This script uses the verify script found in qpid/cpp/examples. diff --git a/qpid/java/client/example/bin/set_classpath.bat b/qpid/java/client/example/bin/set_classpath.bat deleted file mode 100644 index 862e8e467a..0000000000 --- a/qpid/java/client/example/bin/set_classpath.bat +++ /dev/null @@ -1,49 +0,0 @@ -@REM Licensed to the Apache Software Foundation (ASF) under one
-@REM or more contributor license agreements. See the NOTICE file
-@REM distributed with this work for additional information
-@REM regarding copyright ownership. The ASF licenses this file
-@REM to you under the Apache License, Version 2.0 (the
-@REM "License"); you may not use this file except in compliance
-@REM with the License. You may obtain a copy of the License at
-@REM
-@REM http://www.apache.org/licenses/LICENSE-2.0
-@REM
-@REM Unless required by applicable law or agreed to in writing,
-@REM software distributed under the License is distributed on an
-@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-@REM KIND, either express or implied. See the License for the
-@REM specific language governing permissions and limitations
-@REM under the License.
-
-@REM Helper script to set classpath for running Qpid example classes
-@REM NB: You must add the Qpid client and common jars to your CLASSPATH
-@REM before running this script
-
-@echo off
-
-if "%QPID_HOME%" == "" GOTO ERROR_QPID_HOME
-
-set QPIDLIB=%QPID_HOME%\lib
-
-if "%CLASSPATH%" == "" GOTO ERROR_CLASSPATH
-
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\geronimo-jms_1.1_spec-1.0.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-collections-3.1.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-configuration-1.2.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-cli-1.0.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-lang-2.1.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-logging-api-1.0.4.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-logging-1.0.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\log4j-1.2.12.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\mina-core-1.1.7.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\mina-filter-ssl-1.1.7.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\mina-java5-1.0.0.jar
-set CLASSPATH=%CLASSPATH%;%QPIDLIB%\slf4j-simple-1.0.jar
-
-GOTO END
-
-:ERROR_CLASSPATH
-Echo Please set set your CLASSPATH variable to include the Qpid client and common jars. Exiting ....
-:ERROR_QPID_HOME
-Echo Please set QPID_HOME variable. Exiting ....
-:END
diff --git a/qpid/java/client/example/bin/set_classpath.sh b/qpid/java/client/example/bin/set_classpath.sh deleted file mode 100755 index a4f1b93625..0000000000 --- a/qpid/java/client/example/bin/set_classpath.sh +++ /dev/null @@ -1,82 +0,0 @@ -#!/bin/sh -xv -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Helper script to set classpath for running Qpid example classes -# NB: You must add the Qpid client and common jars to your CLASSPATH -# before running this script - - -cygwin=false -if [[ "$(uname -a | fgrep Cygwin)" != "" ]]; then - cygwin=true -fi - -#Should have set the QPID_HOME var after install to the working dir e.g. home/qpid/qpid-1.0-incubating-M2-SNAPSHOT -if [ "$QPID_HOME" = "" ] ; then - echo "ERROR: Please set QPID_HOME variable. Exiting ...." - exit 1 -else - QPIDLIB=$QPID_HOME/lib -fi - -if $cygwin; then - QPIDLIB=$(cygpath -w $QPIDLIB) -fi - -if [ "$CLASSPATH" = "" ] ; then - echo "ERROR: Please set set your CLASSPATH variable to include the Qpid client and common jars. Exiting ...." - exit 2 -fi - -#Converts paths for cygwin if req -#Some nasty concatenation to get round cygpath line limits -if $cygwin; then - SEP=";" - CLASSPATH=`cygpath -w $CLASSPATH` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/geronimo-jms_1.1_spec-1.0.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-collections-3.1.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-configuration-1.2.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-cli-1.0.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-lang-2.1.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-logging-api-1.0.4.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-logging-1.0.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/log4j-1.2.12.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/mina-core-1.1.7.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/mina-filter-ssl-1.1.7.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/mina-java5-1.0.0.jar` - CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/slf4j-simple-1.0.jar` - export CLASSPATH -else - CLASSPATH=$CLASSPATH:$QPIDLIB/backport-util-concurrent-2.2.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/geronimo-jms_1.1_spec-1.0.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/commons-collections-3.1.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/commons-configuration-1.2.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/commons-cli-1.0.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/commons-lang-2.1.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/commons-logging-api-1.0.4.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/commons-logging-1.0.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/log4j-1.2.12.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/mina-core-1.0.0.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/mina-filter-ssl-1.0.0.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/mina-java5-1.0.0.jar - CLASSPATH=$CLASSPATH:$QPIDLIB/slf4j-simple-1.0.jar - export CLASSPATH -fi - diff --git a/qpid/java/client/example/source-jar.xml b/qpid/java/client/example/source-jar.xml deleted file mode 100644 index 60451448b8..0000000000 --- a/qpid/java/client/example/source-jar.xml +++ /dev/null @@ -1,35 +0,0 @@ -<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<!-- This is an assembly descriptor that produces a jar file that contains all the
- dependencies, fully expanded into a single jar, required to run the tests of
- a maven project.
- -->
-<assembly>
- <id>source</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>src/main/java</directory>
- <outputDirectory></outputDirectory>
- </fileSet>
- </fileSets>
-</assembly>
diff --git a/qpid/java/client/example/src/main/java/README.txt b/qpid/java/client/example/src/main/java/README.txt index 7b371f17b5..757054e492 100644 --- a/qpid/java/client/example/src/main/java/README.txt +++ b/qpid/java/client/example/src/main/java/README.txt @@ -23,267 +23,11 @@ Note: you must have write privileges to this directory in order to run the examples. -Running the Direct Examples +Running the Examples =========================== To run these programs, do the following: - 1. Make sure that a qpidd broker is running: - - $ ps -eaf | grep qpidd - - If a broker is running, you should see the qpidd process in the - output of the above command. - - 2. In the java directory, use runSample.sh to run the Consumer - program: - - $ ./runSample.sh org.apache.qpid.example.jmsexample.direct.Consumer - Using QPID_HOME: /usr/share/java/ - Using QPID_SAMPLE: /usr/share/doc/rhm-0.3 - Consumer: Setting an ExceptionListener on the connection as sample uses a MessageConsumer - Consumer: Creating a non-transacted, auto-acknowledged session - Consumer: Creating a MessageConsumer - Consumer: Starting connection so MessageConsumer can receive messages - - 3. In a separate window, use runSample.sh to run the Producer - program: - - $ ./runSample.sh org.apache.qpid.example.jmsexample.direct.Producer - Using QPID_HOME: /usr/share/java/ - Using QPID_SAMPLE: /usr/share/doc/rhm-0.3 - Producer: Creating a non-transacted, auto-acknowledged session - Producer: Creating a Message Producer - Producer: Creating a TestMessage to send to the destination - Producer: Sending message: 1 - Producer: Sending message: 2 - Producer: Sending message: 3 - Producer: Sending message: 4 - Producer: Sending message: 5 - Producer: Sending message: 6 - Producer: Sending message: 7 - Producer: Sending message: 8 - Producer: Sending message: 9 - Producer: Sending message: 10 - Producer: Closing connection - Producer: Closing JNDI context - - 4. Now go back to the window where the Consumer program is - running. You should see the following output: - - Consumer: Received message: Message 1 - Consumer: Received message: Message 2 - Consumer: Received message: Message 3 - Consumer: Received message: Message 4 - Consumer: Received message: Message 5 - Consumer: Received message: Message 6 - Consumer: Received message: Message 7 - Consumer: Received message: Message 8 - Consumer: Received message: Message 9 - Consumer: Received message: Message 10 - Consumer: Received final message That's all, folks! - Consumer: Closing connection - Consumer: Closing JNDI context - - - -Running the Fanout Examples -=========================== - -To run these programs, do the following: - - 1. Make sure that a qpidd broker is running: - - $ ps -eaf | grep qpidd - - If a broker is running, you should see the qpidd process in the - output of the above command. - - 2. In the java directory, use runSample.sh to run the Consumer or - Listener program, specifying a unique queue name, which must be - “fanoutQueue1”, “fanoutQueue2”, or “fanoutQueue3”: - - $ ./runSample.sh org.apache.qpid.example.jmsexample.fanout.Consumer fanoutQueue1 - Using QPID_HOME: /usr/share/java/ - Using QPID_SAMPLE: /usr/share/doc/rhm-0.3 - Consumer: Setting an ExceptionListener on the connection as sample uses a MessageConsumer - Consumer: Creating a non-transacted, auto-acknowledged session - Consumer: Creating a MessageConsumer - Consumer: Starting connection so MessageConsumer can receive messages - - You can do this in up to three windows, specifying a different - name for each queue. - - 3. In a separate window, use runSample.sh to run the Producer - program: - - $ ./runSample.sh org.apache.qpid.example.jmsexample.fanout.Producer - Using QPID_HOME: /usr/share/java/ - Using QPID_SAMPLE: /usr/share/doc/rhm-0.3 - Producer: Creating a non-transacted, auto-acknowledged session - Producer: Creating a Message Producer - Producer: Creating a TestMessage to send to the destination - Producer: Sending message: 1 - Producer: Sending message: 2 - Producer: Sending message: 3 - Producer: Sending message: 4 - Producer: Sending message: 5 - Producer: Sending message: 6 - Producer: Sending message: 7 - Producer: Sending message: 8 - Producer: Sending message: 9 - Producer: Sending message: 10 - Producer: Closing connection - Producer: Closing JNDI context - - 4. Now go back to the window where the Listener program is - running. You should see output like this: - - Consumer: Received message: Message 1 - Consumer: Received message: Message 2 - Consumer: Received message: Message 3 - Consumer: Received message: Message 4 - Consumer: Received message: Message 5 - Consumer: Received message: Message 6 - Consumer: Received message: Message 7 - Consumer: Received message: Message 8 - Consumer: Received message: Message 9 - Consumer: Received message: Message 10 - Consumer: Received final message That's all, folks! - Consumer: Closing connection - Consumer: Closing JNDI context - - -Running the Publish/Subscribe Examples -====================================== - -To run these programs, do the following: - - 1. Make sure that a qpidd broker is running: - - $ ps -eaf | grep qpidd - - If a broker is running, you should see the qpidd process in the - output of the above command. - - 2. In the java directory, use runSample.sh to run the Listener - program: - - $ ./runSample.sh org.apache.qpid.example.jmsexample.pubsub.Listener - Using QPID_HOME: /usr/share/java/ - Using QPID_SAMPLE: /usr/share/doc/rhm-0.3 - Listener: Setting an ExceptionListener on the connection as sample uses a TopicSubscriber - Listener: Creating a non-transacted, auto-acknowledged session - Listener: Creating a Message Subscriber for topic usa - Listener: Creating a Message Subscriber for topic europe - Listener: Creating a Message Subscriber for topic news - Listener: Creating a Message Subscriber for topic weather - Listener: Starting connection so TopicSubscriber can receive messages - - 3. In a separate window, use runSample.sh to run the Publisher - program: - - $ ./runSample.sh org.apache.qpid.example.jmsexample.pubsub.Publisher - Using QPID_HOME: /usr/share/java/ - Using QPID_SAMPLE: /usr/share/doc/rhm-0.3 - Publisher: Creating a non-transacted, auto-acknowledged session - Publisher: Creating a TestMessage to send to the topics - Publisher: Creating a Message Publisher for topic usa.weather - Publisher: Sending message 1 - Publisher: Sending message 2 - Publisher: Sending message 3 - Publisher: Sending message 4 - Publisher: Sending message 5 - Publisher: Sending message 6 - Publisher: Creating a Message Publisher for topic usa.news - Publisher: Sending message 1 - Publisher: Sending message 2 - Publisher: Sending message 3 - Publisher: Sending message 4 - Publisher: Sending message 5 - Publisher: Sending message 6 - Publisher: Creating a Message Publisher for topic europe.weather - Publisher: Sending message 1 - Publisher: Sending message 2 - Publisher: Sending message 3 - Publisher: Sending message 4 - Publisher: Sending message 5 - Publisher: Sending message 6 - Publisher: Creating a Message Publisher for topic europe.news - Publisher: Sending message 1 - Publisher: Sending message 2 - Publisher: Sending message 3 - Publisher: Sending message 4 - Publisher: Sending message 5 - Publisher: Sending message 6 - Publisher: Closing connection - Publisher: Closing JNDI context - - 4. Now go back to the window where the Listener program is - running. You should see output like this: - - Listener: Received message for topic: usa: message 1 - Listener: Received message for topic: weather: message 1 - Listener: Received message for topic: usa: message 2 - Listener: Received message for topic: weather: message 2 - Listener: Received message for topic: usa: message 3 - Listener: Received message for topic: weather: message 3 - Listener: Received message for topic: usa: message 4 - Listener: Received message for topic: weather: message 4 - Listener: Received message for topic: usa: message 5 - Listener: Received message for topic: weather: message 5 - Listener: Received message for topic: usa: message 6 - Listener: Received message for topic: weather: message 6 - . . . - Listener: Shutting down listener for news - Listener: Shutting down listener for weather - Listener: Shutting down listener for usa - Listener: Shutting down listener for europe - Listener: Closing connection - Listener: Closing JNDI context - - -Running the Request/Response Examples -===================================== - -To run these programs, do the following: - - 1. Make sure that a qpidd broker is running: - - $ ps -eaf | grep qpidd - - If a broker is running, you should see the qpidd process in the output of the above command. - - 2. In the java directory, use runSample.sh to run the Server - program: - - $ ./runSample.sh org.apache.qpid.example.jmsexample.requestResponse.Server - Using QPID_HOME: /usr/share/java/ - Using QPID_SAMPLE: /usr/share/doc/rhm-0.3 - Server: Setting an ExceptionListener on the connection as sample uses a MessageConsumer - Server: Creating a non-transacted, auto-acknowledged session - Server: Creating a MessageConsumer - Server: Creating a MessageProducer - Server: Starting connection so MessageConsumer can receive messages - - 3. In a separate window, use runSample.sh to run the Client - program: - - $ ./runSample.sh org.apache.qpid.example.jmsexample.requestResponse.Client - Using QPID_HOME: /usr/share/java/ - Using QPID_SAMPLE: /usr/share/doc/rhm-0.3 - Client: Setting an ExceptionListener on the connection as sample uses a MessageConsumer - Client: Creating a non-transacted, auto-acknowledged session - Client: Creating a QueueRequestor - Client: Starting connection - Client: Request Content= Twas brillig, and the slithy toves - Client: Response Content= TWAS BRILLIG, AND THE SLITHY TOVES - Client: Request Content= Did gire and gymble in the wabe. - Client: Response Content= DID GIRE AND GYMBLE IN THE WABE. - Client: Request Content= All mimsy were the borogroves, - Client: Response Content= ALL MIMSY WERE THE BOROGROVES, - Client: Request Content= And the mome raths outgrabe. - Client: Response Content= AND THE MOME RATHS OUTGRABE. - Client: Closing connection - Client: Closing JNDI context - + 1. Make sure that a Qpid broker is running. + 2. In the java directory, use runSample.sh to run the program: + $ ./runSample.sh <class name> <arguments>
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/client.bnd b/qpid/java/client/src/main/java/client.bnd index 8f0f936583..0ddd163d4f 100755 --- a/qpid/java/client/src/main/java/client.bnd +++ b/qpid/java/client/src/main/java/client.bnd @@ -1,7 +1,26 @@ -ver: 0.7.0 -
-Bundle-SymbolicName: qpid-client
-Bundle-Version: ${ver}
-Export-Package: *;version=${ver}
+# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +ver: 0.9.0 + +Bundle-SymbolicName: qpid-client +Bundle-Version: ${ver} +Export-Package: *;version=${ver} Bundle-RequiredExecutionEnvironment: J2SE-1.5 diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index dbd742070e..ee3e0767d4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1452,16 +1452,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Not a hard-error connection not closing: " + cause); } - - // deliver the exception if there is a listener - if (_exceptionListener != null) - { - _exceptionListener.onException(je); - } - else - { - _logger.error("Throwable Received but no listener set: " + cause); - } // if we are closing the connection, close sessions first if (closer) @@ -1475,6 +1465,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.error("Error closing all sessions: " + e, e); } } + + // deliver the exception if there is a listener + if (_exceptionListener != null) + { + _exceptionListener.onException(je); + } + else + { + _logger.error("Throwable Received but no listener set: " + cause); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 24e5253cc8..75f71a99c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { boolean isTopic; - + Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { isTopic = consumer.getDestination() instanceof AMQTopic || @@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic preAcquire = !consumer.isNoConsume() && (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); + + arguments.putAll( + (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); } - Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 905bf5e111..4bac54b3e4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -107,7 +107,7 @@ public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSes /** * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ - private final boolean _exclusive; + protected boolean _exclusive; /** * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per @@ -182,7 +182,7 @@ public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSes _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; - + _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; _noConsume = noConsume; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index d0f1f79631..699b52a6b1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -490,4 +490,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM clearReceiveQueue(); } } + + public boolean isExclusive() + { + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index cae11e3962..32c7ef29de 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -223,8 +223,10 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate String exchange = replyTo.getExchange(); String routingKey = replyTo.getRoutingKey(); - dest = generateDestination(exchange == null ? null : new AMQShortString(exchange), - routingKey == null ? null : new AMQShortString(routingKey)); + dest = generateDestination(exchange == null ? new AMQShortString("") : + new AMQShortString(exchange), + routingKey == null ? new AMQShortString(""): + new AMQShortString(routingKey)); _destinationCache.put(replyTo, new SoftReference<Destination>(dest)); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 64d5b16db0..00503cc650 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Subscription; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.messaging.address.Node.UnknownNodeType; @@ -264,6 +265,7 @@ public class AddressHelper public Link getLink() { Link link = new Link(); + link.setSubscription(new Subscription()); if (linkProps != null) { link.setDurable(linkProps.getBoolean(DURABLE) == null ? false @@ -283,7 +285,8 @@ public class AddressHelper .setProducerCapacity(capacityProps .getInt(CAPACITY_TARGET) == null ? 0 : capacityProps.getInt(CAPACITY_TARGET)); - } else + } + else { int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps .getInt(CAPACITY); @@ -292,6 +295,21 @@ public class AddressHelper } link.setFilter(linkProps.getString(FILTER)); // so far filter type not used + + if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + { + Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + + if (x_subscribe.containsKey(ARGUMENTS)) + { + link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS)); + } + + boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? + Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; + + link.getSubscription().setExclusive(exclusive); + } } return link; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 0ebcaf548b..a7d19d1bd5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.HashMap; +import java.util.Map; + import org.apache.qpid.client.messaging.address.Node.QueueNode; public class Link @@ -34,6 +37,7 @@ public class Link protected int _consumerCapacity = 0; protected int _producerCapacity = 0; protected Node node; + protected Subscription subscription; public Node getNode() { @@ -114,4 +118,40 @@ public class Link { this.name = name; } + + public Subscription getSubscription() + { + return this.subscription; + } + + public void setSubscription(Subscription subscription) + { + this.subscription = subscription; + } + + public static class Subscription + { + private Map<String,Object> args = new HashMap<String,Object>(); + private boolean exclusive = false; + + public Map<String, Object> getArgs() + { + return args; + } + + public void setArgs(Map<String, Object> args) + { + this.args = args; + } + + public boolean isExclusive() + { + return exclusive; + } + + public void setExclusive(boolean exclusive) + { + this.exclusive = exclusive; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index edfb4bb16b..10250a1ac0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -66,7 +66,6 @@ import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.NetworkTransport; import org.apache.qpid.transport.network.OutgoingNetworkTransport; import org.apache.qpid.transport.network.Transport; import org.slf4j.Logger; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 4236f20301..44376331ee 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -112,8 +112,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); _logger.info("Using ProtocolVersion for Session:" + _protocolVersion); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - this); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); _connection = connection; } diff --git a/qpid/java/common.xml b/qpid/java/common.xml index 3ebf07a210..b1f28dc062 100644 --- a/qpid/java/common.xml +++ b/qpid/java/common.xml @@ -23,7 +23,7 @@ <dirname property="project.root" file="${ant.file.common}"/> <property name="project.name" value="qpid"/> - <property name="project.version" value="0.7"/> + <property name="project.version" value="0.9"/> <property name="project.namever" value="${project.name}-${project.version}"/> <property name="resources" location="${project.root}/resources"/> diff --git a/qpid/java/common/src/main/java/common.bnd b/qpid/java/common/src/main/java/common.bnd index 6cd8a52976..ef56ecec9e 100755 --- a/qpid/java/common/src/main/java/common.bnd +++ b/qpid/java/common/src/main/java/common.bnd @@ -1,4 +1,23 @@ -ver: 0.7.0
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+ver: 0.9.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 37e731206c..a4db16742a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,13 +20,11 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -35,19 +33,16 @@ import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Struct; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.transport.codec.BBDecoder; /** * Assembler */ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { - private static final Logger _log = LoggerFactory.getLogger(Assembler.class); - private final Receiver<ProtocolEvent> receiver; - private final Map<Integer,List<Frame>> segments; - private final Method[] incomplete; + private final Map<Integer, List<Frame>> segments; + private final Map<Integer, Method> incomplete; private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>() { public BBDecoder initialValue() @@ -59,8 +54,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate public Assembler(Receiver<ProtocolEvent> receiver) { this.receiver = receiver; - segments = new HashMap<Integer,List<Frame>>(); - incomplete = new Method[64*1024]; + segments = new HashMap<Integer, List<Frame>>(); + incomplete = new HashMap<Integer, Method>(); +// incomplete = new Method[64*1024]; } private int segmentKey(Frame frame) @@ -102,12 +98,12 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate public void exception(Throwable t) { - this.receiver.exception(t); + receiver.exception(t); } public void closed() { - this.receiver.closed(); + receiver.closed(); } public void init(ProtocolHeader header) @@ -188,7 +184,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.read(dec); if (command.hasPayload()) { - incomplete[channel] = command; + incomplete.put(channel, command); } else { @@ -196,8 +192,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } break; case HEADER: - command = incomplete[channel]; - List<Struct> structs = new ArrayList(2); + command = incomplete.get(channel); + List<Struct> structs = new ArrayList<Struct>(2); while (dec.hasRemaining()) { structs.add(dec.readStruct32()); @@ -205,14 +201,14 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.setHeader(new Header(structs)); if (frame.isLastSegment()) { - incomplete[channel] = null; + incomplete.remove(channel); emit(channel, command); } break; case BODY: - command = incomplete[channel]; + command = incomplete.get(channel); command.setBody(segment); - incomplete[channel] = null; + incomplete.remove(channel); emit(channel, command); break; default: diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 87cabeb874..08b3fae528 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,9 +20,15 @@ */ package org.apache.qpid.transport.network; -import static org.apache.qpid.transport.network.Frame.*; - import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -35,19 +41,14 @@ import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - /** * Disassembler converts protocol events to byte buffers that can be sent on the network. */ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void> { - private final Sender<ByteBuffer> sender; private final int maxPayload; - private final ByteBuffer header; private final Object sendlock = new Object(); private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>() { @@ -66,8 +67,6 @@ public final class Disassembler implements Sender<ProtocolEvent>, } this.sender = sender; this.maxPayload = maxFrame - HEADER_SIZE; - this.header = ByteBuffer.allocate(HEADER_SIZE); - this.header.order(ByteOrder.BIG_ENDIAN); } @@ -78,39 +77,35 @@ public final class Disassembler implements Sender<ProtocolEvent>, public void flush() { - synchronized (sendlock) - { - sender.flush(); - } + sender.flush(); } public void close() { - synchronized (sendlock) - { - sender.close(); - } + sender.close(); } private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) { synchronized (sendlock) { - header.put(0, flags); - header.put(1, type); - header.putShort(2, (short) (size + HEADER_SIZE)); - header.put(5, track); - header.putShort(6, (short) channel); - - header.rewind(); - - sender.send(header); - sender.flush(); + ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE); + data.order(ByteOrder.BIG_ENDIAN); + + data.put(0, flags); + data.put(1, type); + data.putShort(2, (short) (size + HEADER_SIZE)); + data.put(5, track); + data.putShort(6, (short) channel); + data.position(HEADER_SIZE); int limit = buf.limit(); buf.limit(buf.position() + size); - sender.send(buf); + data.put(buf); buf.limit(limit); + + data.rewind(); + sender.send(data); } } @@ -166,14 +161,6 @@ public final class Disassembler implements Sender<ProtocolEvent>, method(method, SegmentType.COMMAND); } - private ByteBuffer copy(ByteBuffer src) - { - ByteBuffer buf = ByteBuffer.allocate(src.remaining()); - buf.put(src); - buf.flip(); - return buf; - } - private void method(Method method, SegmentType type) { BBEncoder enc = encoder.get(); @@ -228,7 +215,6 @@ public final class Disassembler implements Sender<ProtocolEvent>, { fragment(LAST_SEG, SegmentType.BODY, method, body); } - } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java index bb7f059d15..c17527c19c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.transport.network; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import org.apache.qpid.transport.TransportException; @@ -34,7 +34,7 @@ public class Transport public static final String UDP = "udp"; public static final String VM = "vm"; public static final String SOCKET = "socket"; - public static final String MULTICAST = "multicast"; + public static final String MULTICAST = "multicast"; // TODO public static final int DEFAULT_BUFFER_SIZE = 32 * 1024; public static final long DEFAULT_TIMEOUT = 60000; @@ -43,20 +43,35 @@ public class Transport public static final String MINA_TRANSPORT = "org.apache.qpid.transport.network.mina.MinaNetworkTransport"; public static final String IO_TRANSPORT = "org.apache.qpid.transport.network.io.IoNetworkTransport"; - public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport"; - public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport"; + public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport"; // TODO + public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport"; // TODO - private static final List<String> _incoming = new ArrayList<String>(); - private static final List<String> _outgoing = new ArrayList<String>(); + private static final List<String> _incoming = new LinkedList<String>(); + private static final List<String> _outgoing = new LinkedList<String>(); public static void registerIncomingTransport(Class<? extends IncomingNetworkTransport> transport) { - _incoming.add(transport.getName()); + registerTransport(_incoming, transport.getName()); + } + + public static void registerIncomingTransport(String transport) + { + registerTransport(_incoming, transport); } public static void registerOutgoingTransport(Class<? extends OutgoingNetworkTransport> transport) { - _outgoing.add(transport.getName()); + registerTransport(_outgoing, transport.getName()); + } + + public static void registerOutgoingTransport(String transport) + { + registerTransport(_outgoing, transport); + } + + private static void registerTransport(List<String> registered, String transport) + { + registered.add(transport); } public static IncomingNetworkTransport getIncomingTransport() throws TransportException @@ -71,7 +86,7 @@ public class Transport public static OutgoingNetworkTransport getOutgoingTransport(String protocol) throws TransportException { - return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, MINA_TRANSPORT, protocol); + return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, IO_TRANSPORT, protocol); } private static NetworkTransport getTransport(String direction, List<String> registered, String defaultTransport, String protocol) @@ -95,7 +110,7 @@ public class Transport try { - String transport = System.getProperty("qpid.transport." + direction, MINA_TRANSPORT); + String transport = System.getProperty("qpid.transport." + direction, defaultTransport); Class<?> clazz = Class.forName(transport); NetworkTransport network = (NetworkTransport) clazz.newInstance(); if (protocol == null || network.isCompatible(protocol)) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index aa480554ea..0aee08adbe 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -66,8 +66,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport { _socket = new Socket(); - _log.debug("default-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); - _log.debug("default-SO_SNDBUF : %s", _socket.getSendBufferSize()); + _log.debug("default SO_RCVBUF " + _socket.getReceiveBufferSize()); + _log.debug("default SO_SNDBUF " + _socket.getSendBufferSize()); _socket.setTcpNoDelay(noDelay); _socket.setKeepAlive(keepAlive); @@ -75,8 +75,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport _socket.setReceiveBufferSize(receiveBufferSize); _socket.setReuseAddress(true); - _log.debug("new-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); - _log.debug("new-SO_SNDBUF : %s", _socket.getSendBufferSize()); + _log.debug("new SO_RCVBUF " + _socket.getReceiveBufferSize()); + _log.debug("new SO_SNDBUF " + _socket.getSendBufferSize()); InetAddress address = InetAddress.getByName(settings.getHost()); _socket.connect(new InetSocketAddress(address, settings.getPort())); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java index babfc3d698..d53031e21b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java @@ -24,19 +24,21 @@ import static org.apache.qpid.transport.util.Functions.*; import static org.apache.qpid.configuration.ClientProperties.*; import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.LoggingFilter; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.util.SessionUtil; import org.apache.qpid.protocol.ReceiverFactory; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.NetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,12 +49,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter { private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class); - private NetworkTransport _transport = null; + private MinaNetworkTransport _transport = null; private SSLContextFactory _sslFactory = null; private ReceiverFactory _factory = null; private boolean _debug = false; - public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory) + public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory) { _transport = transport; _sslFactory = sslFactory; @@ -60,7 +62,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter _debug = Boolean.getBoolean("amqj.protocol.debug"); } - public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory) + public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory) { this(transport, sslFactory, null); } @@ -83,6 +85,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter public void exceptionCaught(IoSession ssn, Throwable e) { Receiver<java.nio.ByteBuffer> receiver = (Receiver) ssn.getAttachment(); + _log.error("Caught exception in transport layer", e); receiver.exception(e); } @@ -100,6 +103,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter SessionUtil.initialize(session); IoFilterChain chain = session.getFilterChain(); + if (chain.contains(ExecutorThreadModel.class.getName())) + { + chain.remove(ExecutorThreadModel.class.getName()); + } + IoFilterAdapter filter = new ExecutorFilter(_transport.getExecutor()); + chain.addFirst("sessionExecutor", filter); // Add SSL filter if (_sslFactory != null) @@ -158,8 +167,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter { _log.info("Idle MINA session: " + System.identityHashCode(session)); session.close(); - Receiver<java.nio.ByteBuffer> receiver = (Receiver) session.getAttachment(); - receiver.closed(); } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java index 2010b2dd93..ac1b959de7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -28,23 +28,27 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoAcceptorConfig; import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; import org.apache.mina.common.PooledByteBufferAllocator; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.ThreadModel; +import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.transport.socket.nio.DatagramAcceptor; -import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig; import org.apache.mina.transport.socket.nio.DatagramConnector; import org.apache.mina.transport.socket.nio.DatagramSessionConfig; import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; @@ -71,7 +75,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN public static final List<String> SUPPORTED = Arrays.asList(Transport.SOCKET, Transport.TCP, Transport.UDP, Transport.VM); private int _threads; - private Executor _executor; + private ExecutorService _executor; private ConnectionSettings _settings; private SocketAddress _address; private IoConnector _connector; @@ -93,7 +97,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } - int processors = Runtime.getRuntime().availableProcessors(); + int processors = (Runtime.getRuntime().availableProcessors() * 4) + 1; _threads = Integer.parseInt(System.getProperty("amqj.processors", Integer.toString(processors))); _executor = Executors.newCachedThreadPool(Threading.getThreadFactory()); } @@ -130,7 +134,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN if (socket == null) { throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://<SocketID>' transport"); + "with 'socket://<SocketID>' transport"); } _address = socket.getRemoteSocketAddress(); _connector = new ExistingSocketConnector(1, _executor); @@ -142,25 +146,26 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } _log.info("Connecting to broker on: " + _address); - String s = "-"; + String name = "MINANetworkTransport(Client)"; StackTraceElement[] trace = Thread.currentThread().getStackTrace(); for (StackTraceElement elt : trace) { - if (elt.getClassName().contains("Test")) + if (elt.getClassName().endsWith("Test")) { - s += elt.getClassName(); - break; + name += "-" + elt.getClassName(); +// break; // FIXME } } - - IoServiceConfig cfg = _connector.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Client)" + s)); - + + IoServiceConfig config = _connector.getDefaultConfig(); + config.setThreadModel(ThreadModel.MANUAL); + // Socket based connection configuration only (TCP/SOCKET) if (_connector instanceof SocketConnector) { - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + SocketSessionConfig scfg = (SocketSessionConfig) config.getSessionConfig(); scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + scfg.setKeepAlive(Boolean.getBoolean("amqj.keepAlive")); Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); scfg.setSendBufferSize(sendBufferSize); @@ -173,7 +178,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } // Connect to the broker - ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), cfg); + ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), config); future.join(); if (!future.isConnected()) { @@ -181,6 +186,14 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } _session = future.getSession(); _session.setAttachment(_receiver); + + IoFilterChain chain = _session.getFilterChain(); + if (chain.contains(ExecutorThreadModel.class.getName())) + { + chain.remove(ExecutorThreadModel.class.getName()); + } + IoFilterAdapter filter = new ExecutorFilter(_executor); + chain.addFirst("clientExecutor", filter); return new MinaNetworkConnection(_session); } @@ -191,9 +204,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _acceptor = new SocketAcceptor(_threads, _executor); - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); - sconfig.setDisconnectOnUnbind(true); - SocketSessionConfig ssc = (SocketSessionConfig) sconfig.getSessionConfig(); + SocketSessionConfig ssc = (SocketSessionConfig) _acceptor.getDefaultConfig().getSessionConfig(); ssc.setReuseAddress(true); ssc.setKeepAlive(Boolean.getBoolean("amqj.keepAlive")); ssc.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); @@ -215,9 +226,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _acceptor = new DatagramAcceptor(_executor); - DatagramAcceptorConfig dconfig = (DatagramAcceptorConfig) _acceptor.getDefaultConfig(); - dconfig.setDisconnectOnUnbind(true); - DatagramSessionConfig dsc = (DatagramSessionConfig) dconfig.getSessionConfig(); + DatagramSessionConfig dsc = (DatagramSessionConfig) _acceptor.getDefaultConfig().getSessionConfig(); dsc.setReuseAddress(true); Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); @@ -235,16 +244,17 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } else if (settings.getProtocol().equalsIgnoreCase(Transport.VM)) { - _acceptor = new VmPipeAcceptor(); - _address = new VmPipeAddress(settings.getPort()); + _acceptor = new VmPipeAcceptor(); + _address = new VmPipeAddress(settings.getPort()); } else { throw new TransportException("Unknown protocol: " + settings.getProtocol()); } - IoServiceConfig cfg = _acceptor.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Broker)")); + IoAcceptorConfig config = (IoAcceptorConfig) _acceptor.getDefaultConfig(); + config.setThreadModel(ThreadModel.MANUAL); + config.setDisconnectOnUnbind(true); try { @@ -255,6 +265,11 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN throw new TransportException("Could not bind to " + _address, e); } } + + public Executor getExecutor() + { + return _executor; + } public SocketAddress getAddress() { @@ -275,6 +290,10 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _session.close(); } + if (_executor != null) + { + _executor.shutdownNow(); + } } public boolean isCompatible(String protocol) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index 5fc3032d35..10d70ed34f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -26,20 +26,16 @@ import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.apache.qpid.transport.network.Transport; /** * MinaSender */ public class MinaSender implements Sender<java.nio.ByteBuffer> { - private static final Logger _log = LoggerFactory.getLogger(MinaSender.class); - private final IoSession _session; - private WriteFuture _lastWrite; - private int _idleTimeout = 0; + private int _idle = 0; + private WriteFuture _written; public MinaSender(IoSession session) { @@ -52,41 +48,36 @@ public class MinaSender implements Sender<java.nio.ByteBuffer> { throw new TransportException("attempted to write to a closed socket"); } - ByteBuffer mina = ByteBuffer.allocate(buf.capacity()); - mina.put(buf); - mina.flip(); - flush(); - _lastWrite = _session.write(mina); + _written = _session.write(ByteBuffer.wrap(buf)); } public synchronized void flush() { - if (_lastWrite != null) + if (_written != null) { - _lastWrite.join(); - if (!_lastWrite.isWritten()) - { - throw new RuntimeException("Error flushing buffer"); - } + _written.join(Transport.DEFAULT_TIMEOUT); + if (!_written.isWritten()) + { + throw new TransportException("Error flushing data buffer"); + } } } - public void close() + public synchronized void close() { - // MINA will sometimes throw away in-progress writes when you ask it to close flush(); CloseFuture closed = _session.close(); closed.join(); } - public void setIdleTimeout(int i) + public void setIdleTimeout(int idle) { - _idleTimeout = i; - _session.setWriteTimeout(_idleTimeout); + _idle = idle; + _session.setWriteTimeout(_idle); } public long getIdleTimeout() { - return _idleTimeout; + return _idle; } } diff --git a/qpid/java/lib/slf4j-api-1.4.0.jar b/qpid/java/lib/slf4j-api-1.4.0.jar Binary files differdeleted file mode 100644 index 9ce2532aa4..0000000000 --- a/qpid/java/lib/slf4j-api-1.4.0.jar +++ /dev/null diff --git a/qpid/java/lib/slf4j-api-1.6.1.jar b/qpid/java/lib/slf4j-api-1.6.1.jar Binary files differnew file mode 100644 index 0000000000..42e0ad0de7 --- /dev/null +++ b/qpid/java/lib/slf4j-api-1.6.1.jar diff --git a/qpid/java/lib/slf4j-log4j12-1.4.0.jar b/qpid/java/lib/slf4j-log4j12-1.4.0.jar Binary files differdeleted file mode 100644 index e8e09f35ca..0000000000 --- a/qpid/java/lib/slf4j-log4j12-1.4.0.jar +++ /dev/null diff --git a/qpid/java/lib/slf4j-log4j12-1.6.1.jar b/qpid/java/lib/slf4j-log4j12-1.6.1.jar Binary files differnew file mode 100644 index 0000000000..873d11983e --- /dev/null +++ b/qpid/java/lib/slf4j-log4j12-1.6.1.jar diff --git a/qpid/java/management/client/README.txt b/qpid/java/management/client/README.txt index 34a48f1f50..ecd61da75e 100644 --- a/qpid/java/management/client/README.txt +++ b/qpid/java/management/client/README.txt @@ -39,4 +39,4 @@ Administration After QMan has been started successfully you can browse its administration console pointing your browser to :
-http://<host>:<port>/qman/admin.jsp
\ No newline at end of file +http://<host>:<port>/qman/console
diff --git a/qpid/java/management/client/bin/qman-wsdm-start.cmd b/qpid/java/management/client/bin/qman-wsdm-start.cmd index df30ce8617..ec8321c6b8 100644 --- a/qpid/java/management/client/bin/qman-wsdm-start.cmd +++ b/qpid/java/management/client/bin/qman-wsdm-start.cmd @@ -59,8 +59,8 @@ SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\start.jar SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-6.1.14.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-util-6.1.14.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\geronimo-servlet_2.5_spec-1.2.jar
-SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-api-1.4.0.jar
-SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-log4j12-1.4.0.jar
+SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-api-1.6.1.jar
+SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-log4j12-1.6.1.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\log4j-1.2.12.jar
echo ===============================================================================
@@ -85,4 +85,4 @@ echo. echo ===============================================================================
echo.
-%JAVA% -cp %CLASSPATH% -DQMAN_HOME=%QMAN_HOME% -Djetty.home=%QMAN_HOME% -Dqman.host=%QMAN_WSDM_ADAPTER_HOST% -Dqman.port=%QMAN_WSDM_ADAPTER_PORT% -DSTOP.PORT=%ADMIN_PORT% -DSTOP.KEY=%ADMIN_KEY% -Dqman-config=%QMAN_CONFIG_FILE% org.mortbay.start.Main %JETTY_CONFIG_FILE%
\ No newline at end of file +%JAVA% -cp %CLASSPATH% -DQMAN_HOME=%QMAN_HOME% -Djetty.home=%QMAN_HOME% -Dqman.host=%QMAN_WSDM_ADAPTER_HOST% -Dqman.port=%QMAN_WSDM_ADAPTER_PORT% -DSTOP.PORT=%ADMIN_PORT% -DSTOP.KEY=%ADMIN_KEY% -Dqman-config=%QMAN_CONFIG_FILE% org.mortbay.start.Main %JETTY_CONFIG_FILE%
diff --git a/qpid/java/management/client/bin/qman-wsdm-start.sh b/qpid/java/management/client/bin/qman-wsdm-start.sh index 39a4cba66e..0024890527 100644 --- a/qpid/java/management/client/bin/qman-wsdm-start.sh +++ b/qpid/java/management/client/bin/qman-wsdm-start.sh @@ -58,7 +58,7 @@ ADMIN_KEY=gazzax QMAN_LIBS=$QMAN_HOME/lib JETTY_CONFIG_FILE=$QMAN_HOME/etc/jetty.xml -QMAN_CLASSPATH=$QMAN_HOME/etc:$QMAN_LIBS/start.jar:$QMAN_LIBS/jetty-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/geronimo-servlet_2.5_spec-1.2.jar:$QMAN_LIBS/slf4j-api-1.4.0.jar:$QMAN_LIBS/slf4j-log4j12-1.4.0.jar:$QMAN_LIBS/log4j-1.2.12.jar +QMAN_CLASSPATH=$QMAN_HOME/etc:$QMAN_LIBS/start.jar:$QMAN_LIBS/jetty-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/geronimo-servlet_2.5_spec-1.2.jar:$QMAN_LIBS/slf4j-api-1.6.1.jar:$QMAN_LIBS/slf4j-log4j12-1.6.1.jar:$QMAN_LIBS/log4j-1.2.12.jar echo "===============================================================================" echo"" @@ -86,4 +86,4 @@ echo"" echo "===============================================================================" echo"" -"$JAVA" $JAVA_OPTS -cp $QMAN_CLASSPATH -DQMAN_HOME=$QMAN_HOME -Djetty.home=$QMAN_HOME -Dqman.host=$QMAN_WSDM_ADAPTER_HOST -Dqman.port=$QMAN_WSDM_ADAPTER_PORT -DSTOP.PORT=$ADMIN_PORT -DSTOP.KEY=$ADMIN_KEY -Dqman-config=$QMAN_CONFIG_FILE org.mortbay.start.Main $JETTY_CONFIG_FILE
\ No newline at end of file +"$JAVA" $JAVA_OPTS -cp $QMAN_CLASSPATH -DQMAN_HOME=$QMAN_HOME -Djetty.home=$QMAN_HOME -Dqman.host=$QMAN_WSDM_ADAPTER_HOST -Dqman.port=$QMAN_WSDM_ADAPTER_PORT -DSTOP.PORT=$ADMIN_PORT -DSTOP.KEY=$ADMIN_KEY -Dqman-config=$QMAN_CONFIG_FILE org.mortbay.start.Main $JETTY_CONFIG_FILE diff --git a/qpid/java/management/client/build.xml b/qpid/java/management/client/build.xml index f623449c4b..a67f69c43d 100644 --- a/qpid/java/management/client/build.xml +++ b/qpid/java/management/client/build.xml @@ -44,9 +44,9 @@ <copy todir="${module.release}" failonerror="false" flatten="true"> <fileset dir="${resources}" excludes="META-INF"> <exclude name="META-INF"/> - <exclude name="README"/> + <exclude name="README.txt"/> </fileset> - <fileset file="${module.build}${file.separator}README"/> + <fileset file="${module.build}${file.separator}README.txt"/> </copy> </target> @@ -153,7 +153,7 @@ <target name="copy-README-to-build"> <copy todir="${module.build}"> <fileset dir="${module.src}${file.separator}..${file.separator}..${file.separator}.."> - <include name="README"/> + <include name="README.txt"/> </fileset> </copy> </target> @@ -163,7 +163,7 @@ <mkdir dir="${examples.folder}${file.separator}sample_messages"/> <copy todir="${examples.folder}"> <fileset dir="${module.src}${file.separator}..${file.separator}..${file.separator}example"> - <include name="README"/> + <include name="README.txt"/> </fileset> </copy> <copy todir="${examples.folder}${file.separator}src"> @@ -175,7 +175,7 @@ <copy todir="${examples.folder}${file.separator}sample_messages"> <fileset dir="${module.src}${file.separator}..${file.separator}..${file.separator}example" > <exclude name="**/*.java"/> - <exclude name="**/README"/> + <exclude name="**/README.txt"/> <include name="**/*.out.*"/> </fileset> </copy> diff --git a/qpid/java/management/common/src/main/java/management-common.bnd b/qpid/java/management/common/src/main/java/management-common.bnd index 3b2c34b06e..cb28d309a6 100644 --- a/qpid/java/management/common/src/main/java/management-common.bnd +++ b/qpid/java/management/common/src/main/java/management-common.bnd @@ -1,8 +1,27 @@ -ver: 0.7.0 -
-Bundle-SymbolicName: qpid-management-common
-Bundle-Version: ${ver}
-Export-Package: *;version=${ver}
+# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +ver: 0.9.0 + +Bundle-SymbolicName: qpid-management-common +Bundle-Version: ${ver} +Export-Package: *;version=${ver} Bundle-RequiredExecutionEnvironment: J2SE-1.5 Require-Bundle: jmxremote.sasl;resolution:=optional diff --git a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF index 7db972995b..124fe1e767 100644 --- a/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF +++ b/qpid/java/management/eclipse-plugin/META-INF/MANIFEST.MF @@ -3,7 +3,7 @@ Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt Bundle-ManifestVersion: 2 Bundle-Name: Qpid JMX Management Console Plug-in Bundle-SymbolicName: org.apache.qpid.management.ui; singleton:=true -Bundle-Version: 0.7.0 +Bundle-Version: 0.9.0 Bundle-Activator: org.apache.qpid.management.ui.Activator Bundle-Vendor: Apache Software Foundation Bundle-Localization: plugin diff --git a/qpid/java/management/eclipse-plugin/src/main/resources/jmxremote.sasl-plugin/MANIFEST.MF b/qpid/java/management/eclipse-plugin/src/main/resources/jmxremote.sasl-plugin/MANIFEST.MF index fa11bac2ea..83c7c9f435 100644 --- a/qpid/java/management/eclipse-plugin/src/main/resources/jmxremote.sasl-plugin/MANIFEST.MF +++ b/qpid/java/management/eclipse-plugin/src/main/resources/jmxremote.sasl-plugin/MANIFEST.MF @@ -1,4 +1,5 @@ Manifest-Version: 1.0 +Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt Bundle-ManifestVersion: 2 Bundle-Name: jmx sasl Plug-in Bundle-SymbolicName: jmxremote.sasl diff --git a/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/Info.plist b/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/Info.plist index e06c8a6e60..c6482a9254 100644 --- a/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/Info.plist +++ b/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/Info.plist @@ -1,5 +1,25 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> <plist version="1.0"> <dict> <key>CFBundleExecutable</key> diff --git a/qpid/java/release-docs/RELEASE_NOTES.txt b/qpid/java/release-docs/RELEASE_NOTES.txt index 690b04dfc0..f94c45fd4d 100644 --- a/qpid/java/release-docs/RELEASE_NOTES.txt +++ b/qpid/java/release-docs/RELEASE_NOTES.txt @@ -1,9 +1,10 @@ -Apache Qpid Incubating Java M4 Release Notes +Apache Qpid Java 0.8 Release Notes ------------------------------------------- -The Qpid M4 release contains support for AMQP 0-8, 0-9 and 0-10. You +The Qpid 0.8 release contains support for AMQP 0-8, 0-9 and 0-10. You can access the specifications from -http://www.amqp.org/tikiwiki/tiki-index.php?page=Download + +http://www.amqp.org/confluence/display/AMQP/AMQP+Specification For full details of Apache Qpid's capabilities see our detailed project documentation at: @@ -13,21 +14,6 @@ http://cwiki.apache.org/confluence/display/qpid/Qpid+Java+Documentation From the link above you can access our Getting Started Guide, FAQ, Build How To and detailed developer documentation. -New features, Improvements and Bug fixes ----------------------- - -A security related problem was addressed. If Base64MD5 passwords are -turned on on the broker and it has been configured to use JMXMP via -the addition of jxmremote_optional.jar to the classpath, it is -possible for an attacker to bypass the authentication on the JMX -management interface due to a bug in password verification. - -A new command line management interface was added (qpid-cli) - -A full list of changes can be found at: -https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12313279&styleName=Text&projectId=12310520 - - Known Issues/Outstanding Work ----------------------------- diff --git a/qpid/java/systests/etc/config-systests-settings.xml b/qpid/java/systests/etc/config-systests-settings.xml index a7f538aec1..751ff133cb 100644 --- a/qpid/java/systests/etc/config-systests-settings.xml +++ b/qpid/java/systests/etc/config-systests-settings.xml @@ -24,6 +24,8 @@ <enabled>false</enabled> <ssl> <enabled>false</enabled> + <keyStorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/keystore.jks</keyStorePath> + <keyStorePassword>password</keyStorePassword> </ssl> </management> <virtualhosts>${QPID_HOME}/etc/virtualhosts-systests.xml</virtualhosts> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java index 8946548353..2dded57dd0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java @@ -138,7 +138,8 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase Message msg = _consumer.receive(3000); assertNotNull("Message should not be null", msg); assertTrue("Message should be a text message", msg instanceof TextMessage); - assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText()); + _logger.error("== " + Integer.toString(i) + " == " + ((TextMessage) msg).getText()); + assertEquals("Message content does not match", Integer.toString(i), ((TextMessage) msg).getText()); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java index acb5d12e57..43a0f4dbec 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java @@ -169,7 +169,7 @@ public class QueueDepthWithSelectorTest extends QpidBrokerTestCase for (int i = 0; i < MSG_COUNT; i++) { _messages[i] = _consumer.receive(1000); - assertNotNull("should have received a message but didn't", _messages[i]); + assertNotNull("should have received a message but didn't " + i, _messages[i]); } // long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index af7c08ca65..1d1f4a53d5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -280,6 +280,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", dest.getQueueName(),"hello", Collections.<String, Object>emptyMap())); + + // The client should be able to query and verify the existence of my-exchange (QPID-2774) + dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}"); + cons = jmsSession.createConsumer(dest); } public void testBindQueueWithArgs() throws Exception @@ -684,9 +688,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } /** - * Test Goal : Verify that unique subscription queues are created when consumers are - * created using the same destination except when the subscription queue - * has a name. + * Test Goal : When the same destination is used when creating two consumers, + * If the type == topic, verify that unique subscription queues are created, + * unless subscription queue has a name. + * + * If the type == queue, same queue should be shared. */ public void testSubscriptionForSameDestination() throws Exception { @@ -715,6 +721,28 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase catch(Exception e) { } + _connection.close(); + + _connection = getConnection() ; + _connection.start(); + ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + dest = ssn.createTopic("ADDR:my_queue; {create: always}"); + consumer1 = ssn.createConsumer(dest); + consumer2 = ssn.createConsumer(dest); + prod = ssn.createProducer(dest); + + prod.send(ssn.createTextMessage("A")); + Message m1 = consumer1.receive(1000); + Message m2 = consumer2.receive(1000); + + if (m1 != null) + { + assertNull("Only one consumer should receive the message",m2); + } + else + { + assertNotNull("Only one consumer should receive the message",m2); + } } public void testXBindingsWithoutExchangeName() throws Exception @@ -752,4 +780,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } + + public void testXSubscribeOverrides() throws Exception + { + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; + Destination dest = ssn.createTopic(str); + MessageConsumer consumer1 = ssn.createConsumer(dest); + try + { + MessageConsumer consumer2 = ssn.createConsumer(dest); + fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); + } + catch(Exception e) + { + } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java index 59ce64eb4f..8c5299e301 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java @@ -239,7 +239,8 @@ public class BytesMessageTest extends QpidBrokerTestCase implements MessageListe { if (expected[i] != actual[i]) { - throw new RuntimeException("Failed on byte " + i + " of " + expected.length); + throw new RuntimeException("Failed on byte " + i + " of " + expected.length + + "(" + new String(expected) +", " + new String(actual) + ")"); } } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java index d97e22e024..e6f3ef7493 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java @@ -41,11 +41,18 @@ public class LargeMessageTest extends QpidBrokerTestCase private static final Logger _logger = LoggerFactory.getLogger(LargeMessageTest.class); private Destination _destination; - private AMQSession _session; + private AMQSession<?, ?> _session; private AMQConnection _connection; protected void setUp() throws Exception { + // Smaller packet size for UDP + if (Boolean.getBoolean("profile.udp")) + { + setConfigurationProperty("advanced.framesize", "20000"); + setBrokerEnvironment("qpid.maxFrameSize", "20000"); + } + super.setUp(); try { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java index 17ac0dfff2..50162449e5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java @@ -46,7 +46,7 @@ public class ConnectionCloseTest extends QpidBrokerTestCase public void testSendReceiveClose() throws Exception { Map<Thread,StackTraceElement[]> before = Thread.getAllStackTraces(); - + for (int i = 0; i < 500; i++) { if ((i % 10) == 0) @@ -92,7 +92,7 @@ public class ConnectionCloseTest extends QpidBrokerTestCase assertTrue("Spurious thread creation exceeded threshold, " + delta.size() + " threads created.", - delta.size() < 100); + delta.size() < 10); } private void dumpStacks(Map<Thread,StackTraceElement[]> map) @@ -104,5 +104,4 @@ public class ConnectionCloseTest extends QpidBrokerTestCase log.warn(t, entry.getKey().toString()); } } - } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 6bf610ff90..68d774fce6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -272,6 +272,28 @@ public class ConnectionTest extends QpidBrokerTestCase } connection.close(); } + + public void testUnsupportedSASLMechanism() throws Exception + { + BrokerDetails broker = getBroker(); + broker.setProperty(BrokerDetails.OPTIONS_SASL_MECHS, "MY_MECH"); + + try + { + Connection connection = new AMQConnection(broker.toString(), "guest", "guest", + null, "test"); + connection.close(); + fail("The client should throw a ConnectionException stating the" + + " broker does not support the SASL mech specified by the client"); + } + catch (Exception e) + { + assertTrue("Incorrect exception thrown", + e.getMessage().contains("The following SASL mechanisms " + + "[MY_MECH]" + + " specified by the client are not supported by the broker")); + } + } public static junit.framework.Test suite() { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index d73761d12a..d799b141c0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -657,7 +657,14 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase session.commit(); // Check queue has no messages - assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + if (isJavaBroker()) + { + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + } + else + { + assertTrue("At most the queue should have only 1 message", ((AMQSession<?, ?>) session).getQueueDepth(queue) <= 1); + } // Unsubscribe session.unsubscribe("sameMessageSelector"); @@ -671,7 +678,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase * <li>create another durable subscriber with a different selector and same name * <li>check first subscriber is now closed * <li>create a publisher and send messages - * <li>check messages are recieved correctly + * <li>check messages are received correctly * </ul> * <p> * QPID-2418 @@ -704,6 +711,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase e.printStackTrace(); } + conn.stop(); + // Send 1 matching message and 1 non-matching message MessageProducer producer = session.createProducer(topic); TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); @@ -718,6 +727,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + conn.start(); + Message rMsg = subB.receive(1000); assertNotNull(rMsg); assertEquals("Content was wrong", @@ -768,6 +779,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase e.printStackTrace(); } + conn.stop(); + // Send 1 matching message and 1 non-matching message MessageProducer producer = session.createProducer(topic); TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); @@ -782,6 +795,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName"); assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + conn.start(); + Message rMsg = subTwo.receive(1000); assertNotNull(rMsg); assertEquals("Content was wrong", diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java index f631a9a4ba..074c2fa566 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java @@ -243,11 +243,14 @@ public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements Ex /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ public void onException(JMSException jmse) { - _caught.countDown(); - _message = jmse.getLinkedException().getMessage(); - if (jmse.getLinkedException() instanceof AMQException) + if (_caught.getCount() > 0L) { - _code = ((AMQException) jmse.getLinkedException()).getErrorCode(); + _caught.countDown(); + _message = jmse.getLinkedException().getMessage(); + if (jmse.getLinkedException() instanceof AMQException) + { + _code = ((AMQException) jmse.getLinkedException()).getErrorCode(); + } } } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 96fc865fb4..bf5d32d2e6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -639,20 +639,23 @@ public class QpidBrokerTestCase extends QpidTestCase public void stopBroker(int port) throws Exception { - port = getPort(port); - _logger.info("stopping broker: " + getBrokerCommand(port) + " on port " + port); - Process process = _brokers.remove(port); - if (process != null) - { - process.destroy(); - process.waitFor(); - _logger.info("broker exited: " + process.exitValue()); - } - else if (_broker.equals(VM)) + if (_broker.equals(VM)) { VmBroker.killVMBroker(); } + else + { + port = getPort(port); + + Process process = _brokers.remove(port); + if (process != null) + { + process.destroy(); + process.waitFor(); + _logger.info("broker exited: " + process.exitValue()); + } + } } /** @@ -973,20 +976,22 @@ public class QpidBrokerTestCase extends QpidTestCase protected void tearDown() throws Exception { - try + // close all the connections used by this test. + for (Connection c : _connections) { - // close all the connections used by this test. - for (Connection c : _connections) - { + try + { c.close(); } + catch (Exception e) + { + _logger.warn("Error closing connection", e); + } } - finally - { - // Ensure any problems with close does not interfer with property resets - super.tearDown(); - revertLoggingLevels(); - } + + // Ensure any problems with close does not interfer with property resets + super.tearDown(); + revertLoggingLevels(); } /** diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes index a497eaa355..5c225e3b2e 100644 --- a/qpid/java/test-profiles/08StandaloneExcludes +++ b/qpid/java/test-profiles/08StandaloneExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + //====================================================================== //Exclude the following from brokers defaulting to the 0-8 protocol //====================================================================== @@ -19,4 +38,6 @@ org.apache.qpid.test.unit.message.UTF8Test#* org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait // XA Needs 0-10 -org.apache.qpid.test.unit.xa.*
\ No newline at end of file +org.apache.qpid.test.unit.xa.* + +org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMechanism diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 711a3954e4..e89b09cca2 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* org.apache.qpid.client.ResetMessageListenerTest#* diff --git a/qpid/java/test-profiles/CPPNoPrefetchExcludes b/qpid/java/test-profiles/CPPNoPrefetchExcludes index df188ef628..ebcd430161 100644 --- a/qpid/java/test-profiles/CPPNoPrefetchExcludes +++ b/qpid/java/test-profiles/CPPNoPrefetchExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + org.apache.qpid.test.unit.transacted.TransactedTest#testRollback org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverWithQueueBrowser diff --git a/qpid/java/test-profiles/CPPPrefetchExcludes b/qpid/java/test-profiles/CPPPrefetchExcludes index 6b0014b917..7ef52f89c7 100644 --- a/qpid/java/test-profiles/CPPPrefetchExcludes +++ b/qpid/java/test-profiles/CPPPrefetchExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + // those tests should be run with prefetch off org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth diff --git a/qpid/java/test-profiles/CPPTransientExcludes b/qpid/java/test-profiles/CPPTransientExcludes index 90b4251807..47f24db19c 100644 --- a/qpid/java/test-profiles/CPPTransientExcludes +++ b/qpid/java/test-profiles/CPPTransientExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + // those tests need durable subscribe states to be persisted org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes index 12905f238e..9cfc19999c 100644 --- a/qpid/java/test-profiles/Excludes +++ b/qpid/java/test-profiles/Excludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + //====================================================================== //These tests are *always* excluded //====================================================================== @@ -34,4 +53,4 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#* org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#* // QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail. -org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
\ No newline at end of file +org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index eecffbbd2c..533e6976be 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + //====================================================================== //These tests will not work over AMQP 0-10 //====================================================================== @@ -16,6 +35,9 @@ org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFails org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub +// 0-10 does not have AMQProtocolHandler access +org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* + //this test checks explicitly for 0-8 flow control semantics org.apache.qpid.test.client.FlowControlTest#* diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes index 57fb525f32..b38b482be5 100644 --- a/qpid/java/test-profiles/JavaExcludes +++ b/qpid/java/test-profiles/JavaExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + //====================================================================== //These tests do not work with the Java broker //====================================================================== diff --git a/qpid/java/test-profiles/JavaInVMExcludes b/qpid/java/test-profiles/JavaInVMExcludes index c51da125be..65981e4801 100644 --- a/qpid/java/test-profiles/JavaInVMExcludes +++ b/qpid/java/test-profiles/JavaInVMExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + //====================================================================== //Exclude the following tests when running the InVM default test profile //====================================================================== diff --git a/qpid/java/test-profiles/JavaPersistentExcludes b/qpid/java/test-profiles/JavaPersistentExcludes index 54650648ed..0c7be9e509 100644 --- a/qpid/java/test-profiles/JavaPersistentExcludes +++ b/qpid/java/test-profiles/JavaPersistentExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + //====================================================================== //These tests require the MemoryMessageStore //====================================================================== diff --git a/qpid/java/test-profiles/JavaStandaloneExcludes b/qpid/java/test-profiles/JavaStandaloneExcludes index ca3a872119..4ec9f1fc02 100644 --- a/qpid/java/test-profiles/JavaStandaloneExcludes +++ b/qpid/java/test-profiles/JavaStandaloneExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + //====================================================================== //These tests require an InVm broker //====================================================================== diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index f192aadccd..7f2f1c2d90 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + //====================================================================== //These tests require a persistent store //====================================================================== diff --git a/qpid/java/test-profiles/XAExcludes b/qpid/java/test-profiles/XAExcludes index 1bb26c5f27..907864a730 100644 --- a/qpid/java/test-profiles/XAExcludes +++ b/qpid/java/test-profiles/XAExcludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + org.apache.qpid.test.unit.xa.QueueTest#* org.apache.qpid.test.unit.xa.TopicTest#* org.apache.qpid.test.unit.xa.FaultTest#* diff --git a/qpid/java/test-profiles/cpp.async.excludes b/qpid/java/test-profiles/cpp.async.excludes index b6479a00ba..d700538345 100644 --- a/qpid/java/test-profiles/cpp.async.excludes +++ b/qpid/java/test-profiles/cpp.async.excludes @@ -1,2 +1,21 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + // the C++ broker doesn't guarantee the order of messages on recovery org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash diff --git a/qpid/java/test-profiles/cpp.async.testprofile b/qpid/java/test-profiles/cpp.async.testprofile index ac8b98471e..c3d47f0ce6 100644 --- a/qpid/java/test-profiles/cpp.async.testprofile +++ b/qpid/java/test-profiles/cpp.async.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# include=cpp profile.excludes=CPPPrefetchExcludes broker.modules=--load-module ${broker.module.store} diff --git a/qpid/java/test-profiles/cpp.cluster.testprofile b/qpid/java/test-profiles/cpp.cluster.testprofile index 4bfd4f69a2..22a082e85b 100644 --- a/qpid/java/test-profiles/cpp.cluster.testprofile +++ b/qpid/java/test-profiles/cpp.cluster.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# include=cpp broker.modules=--load-module ${broker.module.cluster} --cluster-name cpp-java-test-cluster diff --git a/qpid/java/test-profiles/cpp.excludes b/qpid/java/test-profiles/cpp.excludes index 64417a0edc..c8fae1797e 100644 --- a/qpid/java/test-profiles/cpp.excludes +++ b/qpid/java/test-profiles/cpp.excludes @@ -1,3 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + //====================================================================== //Exclude the following tests when running all cpp test profilies //====================================================================== diff --git a/qpid/java/test-profiles/cpp.noprefetch.testprofile b/qpid/java/test-profiles/cpp.noprefetch.testprofile index b43bdd5722..4764cb576b 100644 --- a/qpid/java/test-profiles/cpp.noprefetch.testprofile +++ b/qpid/java/test-profiles/cpp.noprefetch.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# include=cpp profile.excludes=CPPTransientExcludes CPPNoPrefetchExcludes max_prefetch=0 diff --git a/qpid/java/test-profiles/cpp.ssl.excludes b/qpid/java/test-profiles/cpp.ssl.excludes index 1828581d55..4d499c57b9 100644 --- a/qpid/java/test-profiles/cpp.ssl.excludes +++ b/qpid/java/test-profiles/cpp.ssl.excludes @@ -1 +1,20 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + #org.apache.qpid.test.client.failover.FailoverTest#* diff --git a/qpid/java/test-profiles/cpp.ssl.testprofile b/qpid/java/test-profiles/cpp.ssl.testprofile index 9f2581a83a..bf71384835 100644 --- a/qpid/java/test-profiles/cpp.ssl.testprofile +++ b/qpid/java/test-profiles/cpp.ssl.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# include=cpp broker.modules=--load-module ${broker.module.ssl} --ssl-cert-name localhost.localdomain --ssl-cert-password-file ${test.profiles}/test_resources/ssl/pfile --ssl-cert-db ${test.profiles}/test_resources/ssl/server_db/ --ssl-require-client-authentication --ssl-port @SSL_PORT diff --git a/qpid/java/test-profiles/cpp.testprofile b/qpid/java/test-profiles/cpp.testprofile index f6d8d6f353..694e22f48c 100644 --- a/qpid/java/test-profiles/cpp.testprofile +++ b/qpid/java/test-profiles/cpp.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# broker.version=0-10 broker.language=cpp diff --git a/qpid/java/test-profiles/default.0.10.testprofile b/qpid/java/test-profiles/default.0.10.testprofile index 2cec26d632..d14042f786 100644 --- a/qpid/java/test-profiles/default.0.10.testprofile +++ b/qpid/java/test-profiles/default.0.10.testprofile @@ -3,3 +3,5 @@ broker.version=0-10 qpid.amqp.version=0-10 amqj.protocolprovider.class=org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory profile.excludes=JavaTransientExcludes JavaInVMExcludes Java010Excludes +amqj.protocol.debug=true +#qpid.transport.outgoing=org.apache.qpid.transport.network.io.IoNetworkTransport
\ No newline at end of file diff --git a/qpid/java/test-profiles/default.testprofile b/qpid/java/test-profiles/default.testprofile index 9612aabf5f..2b5c552504 100644 --- a/qpid/java/test-profiles/default.testprofile +++ b/qpid/java/test-profiles/default.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# java.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory java.naming.provider.url=${test.profiles}/test-provider.properties @@ -41,4 +59,5 @@ haltonfailure=no haltonerror=no exclude.modules=none -profile.clustered=false
\ No newline at end of file +profile.clustered=false +amqj.protocol.debug=true diff --git a/qpid/java/test-profiles/java-derby.0.10.testprofile b/qpid/java/test-profiles/java-derby.0.10.testprofile index 8c53a9423a..ca9115d30d 100644 --- a/qpid/java/test-profiles/java-derby.0.10.testprofile +++ b/qpid/java/test-profiles/java-derby.0.10.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# broker.language=java broker.version=0-10 broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml diff --git a/qpid/java/test-profiles/java-derby.testprofile b/qpid/java/test-profiles/java-derby.testprofile index a88f2d852d..d22e35f07e 100644 --- a/qpid/java/test-profiles/java-derby.testprofile +++ b/qpid/java/test-profiles/java-derby.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# broker.language=java broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work diff --git a/qpid/java/test-profiles/java.0.10.testprofile b/qpid/java/test-profiles/java.0.10.testprofile index eb615d80d9..a1743eb020 100644 --- a/qpid/java/test-profiles/java.0.10.testprofile +++ b/qpid/java/test-profiles/java.0.10.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# broker.language=java broker.version=0-10 broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml diff --git a/qpid/java/test-profiles/java.testprofile b/qpid/java/test-profiles/java.testprofile index c7d6725d68..c8c776d3e1 100644 --- a/qpid/java/test-profiles/java.testprofile +++ b/qpid/java/test-profiles/java.testprofile @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# broker.language=java broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work diff --git a/qpid/java/test-profiles/test_resources/ssl/app1.crt b/qpid/java/test-profiles/test_resources/ssl/app1.crt index 52004f4dd1..c04b07fce8 100644 --- a/qpid/java/test-profiles/test_resources/ssl/app1.crt +++ b/qpid/java/test-profiles/test_resources/ssl/app1.crt @@ -1,14 +1,15 @@ -----BEGIN CERTIFICATE----- -MIICGjCCAYOgAwIBAgIFAJFV5bcwDQYJKoZIhvcNAQEFBQAwQTELMAkGA1UEBhMC
+MIICPjCCAaegAwIBAgIFAJJRUVUwDQYJKoZIhvcNAQEFBQAwQTELMAkGA1UEBhMC
Q0ExEDAOBgNVBAgTB09udGFyaW8xDTALBgNVBAoTBEFDTUUxETAPBgNVBAMTCE15
-Um9vdENBMB4XDTEwMDcwNjAyNTQ1OFoXDTEwMTAwNjAyNTQ1OFowYTELMAkGA1UE
-BhMCY2ExCzAJBgNVBAgTAm9uMRAwDgYDVQQHEwd0b3JvbnRvMQ0wCwYDVQQKEwRh
+Um9vdENBMB4XDTEwMTAxNDAyMzM1NloXDTE1MTAxNDAyMzM1NlowYTELMAkGA1UE
+BhMCQ0ExCzAJBgNVBAgTAk9OMRAwDgYDVQQHEwdUb3JvbnRvMQ0wCwYDVQQKEwRh
Y21lMQwwCgYDVQQLEwNhcnQxFjAUBgNVBAMMDWFwcDFAYWNtZS5vcmcwgZ8wDQYJ
-KoZIhvcNAQEBBQADgY0AMIGJAoGBAKHJI2XPwISkvic93ICjk9JdYkGeYR4hNK1N
-JYCgzfNAJDq5xxYPkJWd9kc8+nlT0f//nPUVVPtYzGgw/WV0J01Wp8pOJZRdOzYk
-LjOdaJE8vCoL+EMeYVSEgx2XebEV2l7d7z3gGVFKAmQfEhveNxBWNyJ/o9ELapEF
-y1bsPqhjAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAfhj5E7KYqBLOfbOP1DjM1RQ1
-unG/yEbpN+hk0QNN7FHObSHvRfzSfVrZRxFKvZR8o4yN2RL39jkWsq92GGFSlQzF
-pqGA7YjR1j4UGkY3xib3Vr1PsDZWqmH3CjxXTdo0Y28LtQ/QMt58c0wcwFwMCONJ
-ynb4emD3n6Pw7GjyTYg= +KoZIhvcNAQEBBQADgY0AMIGJAoGBAIlYzFnmAsv/Ci4rgp3sWwkFGFYEBwiXx0Xz
+auZ10nrOUz6Ce2FGVQBYFA09zi79iUyn86oLuTY0Kc/1emCZEPkmOW+hw1uk/TxG
+5MqpEOZdsDv4xIqBHgtWv/d3kGubwSS5lia1l6EPvnzHvsQSM//xhkrJaF0fAHx5
+FMkilnvfAgMBAAGjIjAgMAkGA1UdEwQCMAAwEwYDVR0lBAwwCgYIKwYBBQUHAwIw
+DQYJKoZIhvcNAQEFBQADgYEAJ47Q/4/hJMwTTpfcojv9KbZUTrve/wkabUrytNf3
+ogqhaIzgUr+vA9EMBc91Jg1WJC/0VMmTrTEggqrgd/prg4xcyATQOwNR1TiaWC4E
+r3pWEpZZnEJSd4vtcciNFNsbuAt2m4Nc90gPNXKgNoe0+3nuxPLs/TIauwOSDF+I
+oiw= -----END CERTIFICATE----- diff --git a/qpid/java/test-profiles/test_resources/ssl/app1.req b/qpid/java/test-profiles/test_resources/ssl/app1.req index f647ffb6e9..b4f1ff9a2c 100644 --- a/qpid/java/test-profiles/test_resources/ssl/app1.req +++ b/qpid/java/test-profiles/test_resources/ssl/app1.req @@ -1,10 +1,10 @@ -----BEGIN NEW CERTIFICATE REQUEST----- -MIIBoTCCAQoCAQAwYTELMAkGA1UEBhMCY2ExCzAJBgNVBAgTAm9uMRAwDgYDVQQHEwd0b3JvbnRv +MIIBoTCCAQoCAQAwYTELMAkGA1UEBhMCQ0ExCzAJBgNVBAgTAk9OMRAwDgYDVQQHEwdUb3JvbnRv MQ0wCwYDVQQKEwRhY21lMQwwCgYDVQQLEwNhcnQxFjAUBgNVBAMMDWFwcDFAYWNtZS5vcmcwgZ8w -DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKHJI2XPwISkvic93ICjk9JdYkGeYR4hNK1NJYCgzfNA -JDq5xxYPkJWd9kc8+nlT0f//nPUVVPtYzGgw/WV0J01Wp8pOJZRdOzYkLjOdaJE8vCoL+EMeYVSE -gx2XebEV2l7d7z3gGVFKAmQfEhveNxBWNyJ/o9ELapEFy1bsPqhjAgMBAAGgADANBgkqhkiG9w0B -AQQFAAOBgQAgmtr+de8dmT1zYKOOlMZNh9w9FJ/qsrk0Fj6yC8f1QKv2ZE8de5p62U7PKzbLzDML -kmiU9qSHzuucH3Za9zprQ/5t9zIffO2kr+OgPIzgwdNPjVfH5SQrZlZHyVI9lC/0Ou9uJPScj3Qm -B+lQOmY/tP854g+gqX7drBsP4pQHug== +DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAIlYzFnmAsv/Ci4rgp3sWwkFGFYEBwiXx0XzauZ10nrO +Uz6Ce2FGVQBYFA09zi79iUyn86oLuTY0Kc/1emCZEPkmOW+hw1uk/TxG5MqpEOZdsDv4xIqBHgtW +v/d3kGubwSS5lia1l6EPvnzHvsQSM//xhkrJaF0fAHx5FMkilnvfAgMBAAGgADANBgkqhkiG9w0B +AQQFAAOBgQADKx89mTCGIbrCE6lICLYDexGxexeaZaUDq7YgtyXVIs2wcVGcZJGolUARopMWgE+y +ryHTC4nvNCaBULyXGrzwPfzTJaVBiY4V5BoTrmz6Ofd73ZO6ZYNhy9bVLrb5VtDyldCj0EWz2lBe ++OzVUeII5KPopRtzXpMH3sB2OredUg== -----END NEW CERTIFICATE REQUEST----- diff --git a/qpid/java/test-profiles/test_resources/ssl/app2.crt b/qpid/java/test-profiles/test_resources/ssl/app2.crt index 641e2c89e1..5c889a4c31 100644 --- a/qpid/java/test-profiles/test_resources/ssl/app2.crt +++ b/qpid/java/test-profiles/test_resources/ssl/app2.crt @@ -1,14 +1,15 @@ -----BEGIN CERTIFICATE----- -MIICGjCCAYOgAwIBAgIFAJFV5aIwDQYJKoZIhvcNAQEFBQAwQTELMAkGA1UEBhMC
+MIICPjCCAaegAwIBAgIFAJJRUXgwDQYJKoZIhvcNAQEFBQAwQTELMAkGA1UEBhMC
Q0ExEDAOBgNVBAgTB09udGFyaW8xDTALBgNVBAoTBEFDTUUxETAPBgNVBAMTCE15
-Um9vdENBMB4XDTEwMDcwNjAyNTQ0N1oXDTEwMTAwNjAyNTQ0N1owYTELMAkGA1UE
-BhMCY2ExCzAJBgNVBAgTAm9uMRAwDgYDVQQHEwd0b3JvbnRvMQ0wCwYDVQQKEwRh
+Um9vdENBMB4XDTEwMTAxNDAyMzQxNVoXDTE1MTAxNDAyMzQxNVowYTELMAkGA1UE
+BhMCQ0ExCzAJBgNVBAgTAk9OMRAwDgYDVQQHEwdUb3JvbnRvMQ0wCwYDVQQKEwRh
Y21lMQwwCgYDVQQLEwNhcnQxFjAUBgNVBAMMDWFwcDJAYWNtZS5vcmcwgZ8wDQYJ
-KoZIhvcNAQEBBQADgY0AMIGJAoGBAMqo9Z/4mDtK9/NpIMNa7h91aUIYNClV36V7
-iHFzxGnw6ubWb6FB6uEO2KFnjk+Jd0sUEbZI3OCjltbfqGBv8UDZ+3+vMF4HrGJ9
-+5YilFbhvZ8FGWCFjjh9gV0W0ptfsskcw0KCxmeHGHP8RbHoKS4Y79D2bkW1ovO6
-FzFx3uRfAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAXn3f8znVyItIDcC/4zjLczP8
-EbKEpRW656HccDTGygsfK+epyA8CO8RAtddW7epd1z0FCWakd2078pBe225w8/gA
-PQDLlfi1vgAxwhh7xZz1UvtkT9scU/GTdmgg5lZYDBeCDVJ3kuY3t5yg47L3Xuwe
-WutGKNQMrJlUfFUNG70= +KoZIhvcNAQEBBQADgY0AMIGJAoGBAJcIo3TSYxDa1OfmnDEP4qzLxmgyXC3n0Evu
+2nJz0s5zljjItnwJ9UpOkYh/PQcpUWoM+qKeZYadXbGhp8M8nMrJtUPOAKgDmF6A
+DKS9WL7u8kVCcEvBzLRD7bftEm2IPaRu72wOQai76hj11rYWHHkdAPem+C4ODqVn
+y2NN3zDnAgMBAAGjIjAgMAkGA1UdEwQCMAAwEwYDVR0lBAwwCgYIKwYBBQUHAwIw
+DQYJKoZIhvcNAQEFBQADgYEAc5FG8sDbK+i1703rJEwjJ9dCVXljN2jYL1sGXO2o
+9O5Da0zKcQ+OMhLJUoJf38pJw+maYhtT0fKFLItXP/rlyWlaGRBjkcZjOZ2D/Hg5
+/8pEVwiyTYRoEnGKRawnedIbEyBAcgtnlbkTFWXtQmnmgVApSzTpALRn5/jUC1PU
+Y3g= -----END CERTIFICATE----- diff --git a/qpid/java/test-profiles/test_resources/ssl/app2.req b/qpid/java/test-profiles/test_resources/ssl/app2.req index 52d1a7b8e0..53f3494168 100644 --- a/qpid/java/test-profiles/test_resources/ssl/app2.req +++ b/qpid/java/test-profiles/test_resources/ssl/app2.req @@ -1,10 +1,10 @@ -----BEGIN NEW CERTIFICATE REQUEST----- -MIIBoTCCAQoCAQAwYTELMAkGA1UEBhMCY2ExCzAJBgNVBAgTAm9uMRAwDgYDVQQHEwd0b3JvbnRv +MIIBoTCCAQoCAQAwYTELMAkGA1UEBhMCQ0ExCzAJBgNVBAgTAk9OMRAwDgYDVQQHEwdUb3JvbnRv MQ0wCwYDVQQKEwRhY21lMQwwCgYDVQQLEwNhcnQxFjAUBgNVBAMMDWFwcDJAYWNtZS5vcmcwgZ8w -DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMqo9Z/4mDtK9/NpIMNa7h91aUIYNClV36V7iHFzxGnw -6ubWb6FB6uEO2KFnjk+Jd0sUEbZI3OCjltbfqGBv8UDZ+3+vMF4HrGJ9+5YilFbhvZ8FGWCFjjh9 -gV0W0ptfsskcw0KCxmeHGHP8RbHoKS4Y79D2bkW1ovO6FzFx3uRfAgMBAAGgADANBgkqhkiG9w0B -AQQFAAOBgQC3rWDpHak7fbBf+FvdaqxEoIw+g43RsaDqdGX9ZJJ9ybDi50Xy/YzLiP5vRl3XU8mI -EoqN8ioZl83UXh95Lb6eW/S+ELgiwQh8npblRGpd/IobdKjEAKV1+i3reYqpsYI5L/8JNbcyIT4A -QOTc9itCc7O+klJzkmLqqpmlHhYX5A== +DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAJcIo3TSYxDa1OfmnDEP4qzLxmgyXC3n0Evu2nJz0s5z +ljjItnwJ9UpOkYh/PQcpUWoM+qKeZYadXbGhp8M8nMrJtUPOAKgDmF6ADKS9WL7u8kVCcEvBzLRD +7bftEm2IPaRu72wOQai76hj11rYWHHkdAPem+C4ODqVny2NN3zDnAgMBAAGgADANBgkqhkiG9w0B +AQQFAAOBgQAGNtSvXwdyujmMTaVQj2M2jZkgnVFtMBjDDmdz+wgzu8fKaej7e+fJi5owf31wJUUP +0Zi/6mBNj+blmqHjNQ9U3w9Rns0z3+1DbO3Yj48d75IuxQJJd+lXXjCFi2qSBhaNUwyOpzaI1AQo +JJTC1/WMaPENU9bgYYsOrmIhnbt5rQ== -----END NEW CERTIFICATE REQUEST----- diff --git a/qpid/java/test-profiles/test_resources/ssl/keystore.jks b/qpid/java/test-profiles/test_resources/ssl/keystore.jks Binary files differindex 5e0c2451e8..e3a850a248 100644 --- a/qpid/java/test-profiles/test_resources/ssl/keystore.jks +++ b/qpid/java/test-profiles/test_resources/ssl/keystore.jks diff --git a/qpid/java/testkit/bin/qpid-python-testkit b/qpid/java/tools/bin/qpid-python-testkit index 2c1d015281..cbe7972421 100755 --- a/qpid/java/testkit/bin/qpid-python-testkit +++ b/qpid/java/tools/bin/qpid-python-testkit @@ -22,9 +22,9 @@ # via the python test runner. The defaults are set for a running # from an svn checkout -. ./setenv.sh +. ./set-testkit-env.sh -export PYTHONPATH=../:$PYTHONPATH +export PYTHONPATH=./:$PYTHONPATH rm -rf $OUTDIR -$PYTHON_DIR/qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@" +qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@" diff --git a/qpid/java/testkit/bin/setenv.sh b/qpid/java/tools/bin/set-testkit-env.sh index e6a726eef1..051dad8179 100644 --- a/qpid/java/testkit/bin/setenv.sh +++ b/qpid/java/tools/bin/set-testkit-env.sh @@ -62,11 +62,11 @@ fi if [ "$STORE_LIB" = "" ] ; then if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then - CLUSTER_LIB="/usr/lib64/qpid/daemon/msgstore.so" + STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so" elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then - CLUSTER_LIB="/usr/lib/qpid/daemon/msgstore.so" - else - echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0; + STORE_LIB="/usr/lib/qpid/daemon/msgstore.so" + #else + # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0; fi fi @@ -82,7 +82,7 @@ if [ "$QP_CP" = "" ] ; then fi if [ "$OUTDIR" = "" ] ; then - OUTDIR=`abs_path "../output"` + OUTDIR=`abs_path "./output"` fi export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR diff --git a/qpid/java/tools/bin/testkit.py b/qpid/java/tools/bin/testkit.py new file mode 100755 index 0000000000..1c2ad598b8 --- /dev/null +++ b/qpid/java/tools/bin/testkit.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time, string, traceback +from brokertest import * +from qpid.messaging import * + + +try: + import java.lang.System + _cp = java.lang.System.getProperty("java.class.path"); +except ImportError: + _cp = checkenv("QP_CP") + +class Formatter: + + def __init__(self, message): + self.message = message + self.environ = {"M": self.message, + "P": self.message.properties, + "C": self.message.content} + + def __getitem__(self, st): + return eval(st, self.environ) + +# The base test case has support for launching the generic +# receiver and sender through the TestLauncher with all the options. +# +class JavaClientTest(BrokerTest): + """Base Case for Java Test cases""" + + client_class = "org.apache.qpid.testkit.TestLauncher" + + # currently there is no transparent reconnection. + # temp hack: just creating the queue here and closing it. + def start_error_watcher(self,broker=None): + ssn = broker.connect().session() + err_watcher = ssn.receiver("control; {create:always}", capacity=1) + ssn.close() + + def store_module_args(self): + if BrokerTest.store_lib: + return ["--load-module", BrokerTest.store_lib] + else: + print "Store module not present." + return [""] + + def client(self,**options): + cmd = ["java","-cp",_cp] + + cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")] + cmd += ["-Dhost=" + options.get("host","127.0.0.1")] + cmd += ["-Dport=" + str(options.get("port",5672))] + cmd += ["-Dcon_count=" + str(options.get("con_count",1))] + cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))] + cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))] + cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))] + cmd += ["-Ddurable=" + str(options.get("durable",False))] + cmd += ["-Dtransacted=" + str(options.get("transacted",False))] + cmd += ["-Dreceiver=" + str(options.get("receiver",False))] + cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))] + cmd += ["-Dsender=" + str(options.get("sender",False))] + cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))] + cmd += ["-Dtx_size=" + str(options.get("tx_size",10))] + cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))] + cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))] + cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))] + cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))] + cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))] + cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")] + cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))] + cmd += ["-Dlog.level=" + options.get("log.level", "warn")] + cmd += [self.client_class] + cmd += [options.get("address", "my_queue; {create: always}")] + + #print str(options.get("port",5672)) + return cmd + + # currently there is no transparent reconnection. + # temp hack: just creating a receiver and closing session soon after. + def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60): + ssn = broker.connect().session() + err_watcher = ssn.receiver("control; {create:always}", capacity=1) + i = run_time/error_ck_freq + is_error = False + for j in range(i): + not_empty = True + while not_empty: + try: + m = err_watcher.fetch(timeout=error_ck_freq) + ssn.acknowledge() + print "Java process notified of an error" + self.print_error(m) + is_error = True + except messaging.Empty, e: + not_empty = False + + ssn.close() + return is_error + + def print_error(self,msg): + print msg.properties.get("exception-trace") + + def verify(self, receiver,sender): + sender_running = receiver.is_running() + receiver_running = sender.is_running() + + self.assertTrue(receiver_running,"Receiver has exited prematually") + self.assertTrue(sender_running,"Sender has exited prematually") + + def start_sender_and_receiver(self,**options): + + receiver_opts = options + receiver_opts["receiver"]=True + receiver = self.popen(self.client(**receiver_opts), + expect=EXPECT_RUNNING) + + sender_opts = options + sender_opts["sender"]=True + sender = self.popen(self.client(**sender_opts), + expect=EXPECT_RUNNING) + + return receiver, sender + + def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options): + if options.get("durable",False)==True: + cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args()) + else: + cluster = Cluster(self, count=count) + return cluster + +class ConcurrencyTest(JavaClientTest): + """A concurrency test suite for the JMS client""" + skip = False + + def base_case(self,**options): + if self.skip : + print "Skipping test" + return + + cluster = self.start_cluster(count=2,**options) + self.start_error_watcher(broker=cluster[0]) + options["port"] = port=cluster[0].port() + + options["use_unique_dests"]=True + options["address"]="amq.topic" + receiver, sender = self.start_sender_and_receiver(**options) + self.monitor_clients(broker=cluster[0],run_time=180) + self.verify(receiver,sender) + + def test_multiplexing_con(self): + """Tests multiple sessions on a single connection""" + + self.base_case(ssn_per_con=25,test_name=self.id()) + + def test_multiplexing_con_with_tx(self): + """Tests multiple transacted sessions on a single connection""" + + self.base_case(ssn_per_con=25,transacted=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_rcv(self): + """Tests multiple sessions with sync receive""" + + self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id()) + + def test_multiplexing_con_with_durable_sub(self): + """Tests multiple sessions with durable subs""" + + self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_ack(self): + """Tests multiple sessions with sync ack""" + + self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_pub(self): + """Tests multiple sessions with sync pub""" + + self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id()) + + def test_multiple_cons_and_ssns(self): + """Tests multiple connections and sessions""" + + self.base_case(con_count=10,ssn_per_con=25,test_name=self.id()) + + +class SoakTest(JavaClientTest): + """A soak test suite for the JMS client""" + + def base_case(self,**options): + cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options) + options["port"] = port=cluster[0].port() + self.start_error_watcher(broker=cluster[0]) + options["use_unique_dests"]=True + options["address"]="amq.topic" + receiver,sender = self.start_sender_and_receiver(**options) + is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30) + + if (is_error): + print "The sender or receiver didn't start properly. Exiting test." + return + else: + "Print no error !" + + # grace period for java clients to get the failover properly setup. + time.sleep(30) + error_msg= None + # Kill original brokers, start new ones. + try: + for i in range(8): + cluster[i].kill() + b=cluster.start() + self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) + print "iteration : " + str(i) + except ConnectError, e1: + error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) + + except SessionError, e2: + error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) + + self.verify(receiver,sender) + if error_msg: + raise Exception(error_msg) + + + def test_failover(self) : + """Test basic failover""" + + self.base_case(test_name=self.id()) + + + def test_failover_with_durablesub(self): + """Test failover with durable subscriber""" + + self.base_case(durable=True,jms_durable_sub=True,test_name=self.id()) + + + def test_failover_with_sync_rcv(self): + """Test failover with sync receive""" + + self.base_case(sync_rcv=True,test_name=self.id()) + + + def test_failover_with_sync_ack(self): + """Test failover with sync ack""" + + self.base_case(sync_ack=True,test_name=self.id()) + + + def test_failover_with_noprefetch(self): + """Test failover with no prefetch""" + + self.base_case(max_prefetch=1,test_name=self.id()) + + + def test_failover_with_multiple_cons_and_ssns(self): + """Test failover with multiple connections and sessions""" + + self.base_case(use_unique_dests=True,address="amq.topic", + con_count=10,ssn_per_con=25,test_name=self.id()) diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java index b10129d855..b10129d855 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java index dbc73c404f..dbc73c404f 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java new file mode 100644 index 0000000000..b4294ee4cc --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -0,0 +1,216 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + +/** + * A generic receiver which consumes messages + * from a given address in a broker (host/port) + * until told to stop by killing it. + * + * It participates in a feedback loop to ensure the producer + * doesn't fill up the queue. If it receives an "End" msg + * it sends a reply to the replyTo address in that msg. + * + * It doesn't check for correctness or measure anything + * leaving those concerns to another entity. + * However it prints a timestamp every x secs(-Dreport_frequency) + * as checkpoint to figure out how far the test has progressed if + * a failure occurred. + * + * It also takes in an optional Error handler to + * pass out any error in addition to writing them to std err. + * + * This is intended more as building block to create + * more complex test cases. However there is a main method + * provided to use this standalone. + * + * The following options are available and configurable + * via jvm args. + * + * sync_rcv - Whether to consume sync (instead of using a listener). + * report_frequency - how often a timestamp is printed + * durable + * transacted + * tx_size - size of transaction batch in # msgs. * + * check_for_dups - check for duplicate messages and out of order messages. + * jms_durable_sub - create a durable subscription instead of a regular subscription. + */ +public class Receiver extends Client implements MessageListener +{ + long msg_count = 0; + int sequence = 0; + boolean syncRcv = Boolean.getBoolean("sync_rcv"); + boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub"); + boolean checkForDups = Boolean.getBoolean("check_for_dups"); + MessageConsumer consumer; + List<Integer> duplicateMessages = new ArrayList<Integer>(); + + public Receiver(Connection con,String addr) throws Exception + { + super(con); + setSsn(con.createSession(isTransacted(), getAck_mode())); + consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); + if (!syncRcv) + { + consumer.setMessageListener(this); + } + + System.out.println("Receiving messages from : " + addr); + } + + public void onMessage(Message msg) + { + handleMessage(msg); + } + + public void run() throws Exception + { + long sleepTime = getReportFrequency(); + while(true) + { + if(syncRcv) + { + long t = sleepTime; + while (t > 0) + { + long start = System.currentTimeMillis(); + Message msg = consumer.receive(t); + t = t - (System.currentTimeMillis() - start); + handleMessage(msg); + } + } + Thread.sleep(sleepTime); + System.out.println(getDf().format(System.currentTimeMillis()) + + " - messages received : " + msg_count); + } + } + + private void handleMessage(Message m) + { + if (m == null) { return; } + + try + { + if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) + { + MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo()); + Message controlMsg = getSsn().createTextMessage(); + temp.send(controlMsg); + if (isTransacted()) + { + getSsn().commit(); + } + temp.close(); + } + else + { + + int seq = m.getIntProperty("sequence"); + if (checkForDups) + { + if (seq == 0) + { + sequence = 0; // wrap around for each iteration + System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration"); + duplicateMessages.clear(); + } + + if (seq < sequence) + { + duplicateMessages.add(seq); + } + else if (seq == sequence) + { + sequence++; + msg_count ++; + } + else + { + // Multiple publishers are not allowed in this test case. + // So out of order messages are not allowed. + throw new Exception(": Received an out of order message (expected=" + + sequence + ",received=" + seq + ")" ); + } + } + else + { + msg_count ++; + } + + // Please note that this test case doesn't expect duplicates + // When testing for transactions. + if (isTransacted() && msg_count % getTxSize() == 0) + { + getSsn().commit(); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + handleError("Exception receiving messages",e); + } + } + + // Receiver host port address + public static void main(String[] args) throws Exception + { + String host = "127.0.0.1"; + int port = 5672; + String addr = "message_queue"; + + if (args.length > 0) + { + host = args[0]; + } + if (args.length > 1) + { + port = Integer.parseInt(args[1]); + } + if (args.length > 2) + { + addr = args[2]; + } + + AMQConnection con = new AMQConnection( + "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "'"); + + Receiver rcv = new Receiver(con,addr); + rcv.run(); + } + +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java index 14b9b7302f..14b9b7302f 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java new file mode 100644 index 0000000000..36ae7cad42 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java @@ -0,0 +1,370 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; + +/** + * A basic test case class that could launch a Sender/Receiver + * or both, each on it's own separate thread. + * + * If con_count == ssn_count, then each entity created will have + * it's own Connection. Else if con_count < ssn_count, then + * a connection will be shared by ssn_count/con_count # of entities. + * + * The if both sender and receiver options are set, it will + * share a connection. + * + * The following options are available as jvm args + * host, port + * con_count,ssn_count + * con_idle_time - which determines heartbeat + * sender, receiver - booleans which indicate which entity to create. + * Setting them both is also a valid option. + */ +public class TestLauncher implements ErrorHandler +{ + protected String host = "127.0.0.1"; + protected int port = 5672; + protected int sessions_per_con = 1; + protected int connection_count = 1; + protected long heartbeat = 5000; + protected boolean sender = false; + protected boolean receiver = false; + protected boolean useUniqueDests = false; + protected String url; + + protected String address = "my_queue; {create: always}"; + protected boolean durable = false; + protected String failover = ""; + protected AMQConnection controlCon; + protected Destination controlDest = null; + protected Session controlSession = null; + protected MessageProducer statusSender; + protected List<AMQConnection> clients = new ArrayList<AMQConnection>(); + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + protected String testName; + + public TestLauncher() + { + testName = System.getProperty("test_name","UNKNOWN"); + host = System.getProperty("host", "127.0.0.1"); + port = Integer.getInteger("port", 5672); + sessions_per_con = Integer.getInteger("ssn_per_con", 1); + connection_count = Integer.getInteger("con_count", 1); + heartbeat = Long.getLong("heartbeat", 5); + sender = Boolean.getBoolean("sender"); + receiver = Boolean.getBoolean("receiver"); + useUniqueDests = Boolean.getBoolean("use_unique_dests"); + + failover = System.getProperty("failover", ""); + durable = Boolean.getBoolean("durable"); + + url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "?heartbeat='" + heartbeat+ "''"; + + if (failover.equalsIgnoreCase("failover_exchange")) + { + url += "&failover='failover_exchange'"; + + System.out.println("Failover exchange " + url ); + } + + configureLogging(); + } + + protected void configureLogging() + { + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%t %d %p [%c{4}] %m%n"); + BasicConfigurator.configure(new ConsoleAppender(layout)); + + String logLevel = System.getProperty("log.level","warn"); + String logComponent = System.getProperty("log.comp","org.apache.qpid"); + + Logger logger = Logger.getLogger(logComponent); + logger.setLevel(Level.toLevel(logLevel, Level.WARN)); + + System.out.println("Level " + logger.getLevel()); + + } + + public void setUpControlChannel() + { + try + { + controlCon = new AMQConnection(url); + controlCon.start(); + + controlDest = new AMQAnyDestination("control; {create: always}"); // durable + + // Create the session to setup the messages + controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); + statusSender = controlSession.createProducer(controlDest); + + } + catch (Exception e) + { + handleError("Error while setting up the test",e); + } + } + + public void cleanup() + { + try + { + controlSession.close(); + controlCon.close(); + for (AMQConnection con : clients) + { + con.close(); + } + } + catch (Exception e) + { + handleError("Error while tearing down the test",e); + } + } + + public void start(String addr) + { + try + { + if (addr == null) + { + addr = address; + } + + int ssn_per_con = sessions_per_con; + String addrTemp = addr; + for (int i = 0; i< connection_count; i++) + { + AMQConnection con = new AMQConnection(url); + con.start(); + clients.add(con); + for (int j = 0; j< ssn_per_con; j++) + { + String index = createPrefix(i,j); + if (useUniqueDests) + { + addrTemp = modifySubject(index,addr); + } + + if (sender) + { + createSender(index,con,addrTemp,this); + } + + if (receiver) + { + System.out.println("########## Creating receiver ##################"); + + createReceiver(index,con,addrTemp,this); + } + } + } + } + catch (Exception e) + { + handleError("Exception while setting up the test",e); + } + + } + + protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Receiver rcv = new Receiver(con,addr); + rcv.setErrorHandler(h); + rcv.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Receiver", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().newThread(r); + } + catch(Exception e) + { + handleError("Error creating Receive thread",e); + } + + t.setName("ReceiverThread-" + index); + t.start(); + } + + protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Sender sender = new Sender(con, addr); + sender.setErrorHandler(h); + sender.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Sender", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().newThread(r); + } + catch(Exception e) + { + handleError("Error creating Sender thread",e); + } + + t.setName("SenderThread-" + index); + t.start(); + } + + public synchronized void handleError(String msg,Exception e) + { + // In case sending the message fails + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" @ "); + sb.append(df.format(new Date(System.currentTimeMillis()))); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + + try + { + TextMessage errorMsg = controlSession.createTextMessage(); + errorMsg.setStringProperty("status", "error"); + errorMsg.setStringProperty("desc", msg); + errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); + errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); + + System.out.println("Msg " + errorMsg); + + statusSender.send(errorMsg); + } + catch (JMSException e1) + { + e1.printStackTrace(); + } + } + + private String serializeStackTrace(Exception e) + { + ByteArrayOutputStream bOut = new ByteArrayOutputStream(); + PrintStream printStream = new PrintStream(bOut); + e.printStackTrace(printStream); + printStream.close(); + return bOut.toString(); + } + + private String createPrefix(int i, int j) + { + return String.valueOf(i).concat(String.valueOf(j)); + } + + /** + * A basic helper function to modify the subjects by + * appending an index. + */ + private String modifySubject(String index,String addr) + { + if (addr.indexOf("/") > 0) + { + addr = addr.substring(0,addr.indexOf("/")+1) + + index + + addr.substring(addr.indexOf("/")+1,addr.length()); + } + else if (addr.indexOf(";") > 0) + { + addr = addr.substring(0,addr.indexOf(";")) + + "/" + index + + addr.substring(addr.indexOf(";"),addr.length()); + } + else + { + addr = addr + "/" + index; + } + + return addr; + } + + public static void main(String[] args) + { + final TestLauncher test = new TestLauncher(); + test.setUpControlChannel(); + System.out.println("args.length " + args.length); + System.out.println("args [0] " + args [0]); + test.start(args.length > 0 ? args [0] : null); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { test.cleanup(); } + }); + + } +} |