summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-04-09 15:26:04 +0000
committerRobert Greig <rgreig@apache.org>2007-04-09 15:26:04 +0000
commitf966c066fc10c1510c244c41958e343c682dd1a1 (patch)
treeff0d2818fe3729bb6ebdfcb8d654d17b8599538f
parentb8b2e032a4a6a6ad796ce6247c581a0498b5c264 (diff)
downloadqpid-python-f966c066fc10c1510c244c41958e343c682dd1a1.tar.gz
Stopped throwing away exception causes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@526776 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--gentools/templ.java/MethodRegistryClass.tmpl4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java49
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java93
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java53
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java70
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java2
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java4
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java352
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java254
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java95
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/Config.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQException.java56
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java16
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java4
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java5
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/Config.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java3
37 files changed, 745 insertions, 524 deletions
diff --git a/gentools/templ.java/MethodRegistryClass.tmpl b/gentools/templ.java/MethodRegistryClass.tmpl
index 80bc61d4ce..388a1a936f 100644
--- a/gentools/templ.java/MethodRegistryClass.tmpl
+++ b/gentools/templ.java/MethodRegistryClass.tmpl
@@ -60,9 +60,9 @@ public class MainRegistry
if (bodyFactory == null)
{
- throw new AMQFrameDecodingException(
+ throw new AMQFrameDecodingException(null,
"Unable to find a suitable decoder for class " + classID + " and method " +
- methodID + " in AMQP version " + major + "-" + minor + ".");
+ methodID + " in AMQP version " + major + "-" + minor + ".", null);
}
return bodyFactory.newInstance(major, minor, in, size);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index bcbc8f52f6..db9f3a7421 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -1,5 +1,25 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,8 +42,12 @@ import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import org.apache.commons.configuration.Configuration;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -36,9 +60,6 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.commons.configuration.Configuration;
/**
* This MBean implements the broker management interface and exposes the
@@ -82,8 +103,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
* @param autoDelete
* @throws JMException
*/
- public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete)
- throws JMException
+ public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete) throws JMException
{
try
{
@@ -92,7 +112,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
if (exchange == null)
{
- exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
+ exchange =
+ _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable,
+ autoDelete, 0);
_exchangeRegistry.registerExchange(exchange);
}
else
@@ -155,22 +177,27 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
{
ownerShortString = new AMQShortString(owner);
}
+
queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost());
if (queue.isDurable() && !queue.isAutoDelete())
{
_messageStore.createQueue(queue);
}
- Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
+ Configuration virtualHostDefaultQueueConfiguration =
+ VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
if (virtualHostDefaultQueueConfiguration != null)
{
Configurator.configure(queue, virtualHostDefaultQueueConfiguration);
}
+
_queueRegistry.registerQueue(queue);
}
catch (AMQException ex)
{
- throw new MBeanException(new JMException(ex.getMessage()),"Error in creating queue " + queueName);
+ JMException jme = new JMException(ex.getMessage());
+ jme.initCause(ex);
+ throw new MBeanException(jme, "Error in creating queue " + queueName);
}
}
@@ -201,7 +228,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
}
catch (AMQException ex)
{
- throw new MBeanException(new JMException(ex.getMessage()), "Error in deleting queue " + queueName);
+ JMException jme = new JMException(ex.getMessage());
+ jme.initCause(ex);
+ throw new MBeanException(jme, "Error in deleting queue " + queueName);
}
}
@@ -212,7 +241,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
// This will have a single instance for a virtual host, so not having the name property in the ObjectName
public ObjectName getObjectName() throws MalformedObjectNameException
- {
+ {
return getObjectNameForSingleInstanceMBean();
}
} // End of MBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 1d26abb63f..fd010e8923 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,14 +36,17 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.configuration.ConfigurationException;
+
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
+
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -59,7 +62,7 @@ import org.apache.qpid.url.URLSyntaxException;
* Main entry point for AMQPD.
*
*/
-@SuppressWarnings({"AccessStaticViaInstance"})
+@SuppressWarnings({ "AccessStaticViaInstance" })
public class Main
{
private static final Logger _logger = Logger.getLogger(Main.class);
@@ -70,9 +73,9 @@ public class Main
protected static class InitException extends Exception
{
- InitException(String msg)
+ InitException(String msg, Throwable cause)
{
- super(msg);
+ super(msg, cause);
}
}
@@ -93,6 +96,7 @@ public class Main
try
{
commandLine = new PosixParser().parse(options, args);
+
return true;
}
catch (ParseException e)
@@ -100,6 +104,7 @@ public class Main
System.err.println("Error: " + e.getMessage());
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Qpid", options, true);
+
return false;
}
}
@@ -108,17 +113,26 @@ public class Main
{
Option help = new Option("h", "help", false, "print this message");
Option version = new Option("v", "version", false, "print the version information and exit");
- Option configFile = OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").
- withLongOpt("config").create("c");
- Option port = OptionBuilder.withArgName("port").hasArg().withDescription("listen on the specified port. Overrides any value in the config file").
- withLongOpt("port").create("p");
- Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file").
- withLongOpt("bind").create("b");
- Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " +
- "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file").
- withLongOpt("logconfig").create("l");
- Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " +
- "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+ Option configFile =
+ OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
+ .create("c");
+ Option port =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("listen on the specified port. Overrides any value in the config file")
+ .withLongOpt("port").create("p");
+ Option bind =
+ OptionBuilder.withArgName("bind").hasArg()
+ .withDescription("bind to the specified address. Overrides any value in the config file")
+ .withLongOpt("bind").create("b");
+ Option logconfig =
+ OptionBuilder.withArgName("logconfig").hasArg()
+ .withDescription("use the specified log4j xml configuration file. By "
+ + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
+ + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
+ Option logwatchconfig =
+ OptionBuilder.withArgName("logwatch").hasArg()
+ .withDescription("monitor the log file configuration file for changes. Units are seconds. "
+ + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
options.addOption(help);
options.addOption(version);
@@ -146,7 +160,7 @@ public class Main
boolean first = true;
for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
{
- if(first)
+ if (first)
{
first = false;
}
@@ -154,9 +168,11 @@ public class Main
{
protocol.append(", ");
}
+
protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
}
+
System.out.println(ver + " (" + protocol + ")");
}
else
@@ -182,7 +198,6 @@ public class Main
}
}
-
protected void startup() throws InitException, ConfigurationException, Exception
{
final String QpidHome = System.getProperty("QPID_HOME");
@@ -197,7 +212,7 @@ public class Main
error = error + "\nNote: Qpid_HOME is not set.";
}
- throw new InitException(error);
+ throw new InitException(error, null);
}
else
{
@@ -222,8 +237,8 @@ public class Main
_logger.info("Starting Qpid.AMQP broker");
- ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
- getConfiguredObject(ConnectorConfiguration.class);
+ ConnectorConfiguration connectorConfig =
+ ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class);
ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
@@ -245,7 +260,7 @@ public class Main
}
catch (NumberFormatException e)
{
- throw new InitException("Invalid port: " + portStr);
+ throw new InitException("Invalid port: " + portStr, e);
}
}
@@ -260,19 +275,21 @@ public class Main
int totalVHosts = ((Collection) virtualHosts).size();
for (int vhost = 0; vhost < totalVHosts; vhost++)
{
- setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost));
+ setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost));
}
}
else
{
- setupVirtualHosts(configFile.getParent() , (String)virtualHosts);
+ setupVirtualHosts(configFile.getParent(), (String) virtualHosts);
}
}
+
bind(port, connectorConfig);
}
- protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
+ protected void setupVirtualHosts(String configFileParent, String configFilePath)
+ throws ConfigurationException, AMQException, URLSyntaxException
{
String configVar = "${conf}";
@@ -281,7 +298,7 @@ public class Main
configFilePath = configFileParent + configFilePath.substring(configVar.length());
}
- if (configFilePath.indexOf(".xml") != -1 )
+ if (configFilePath.indexOf(".xml") != -1)
{
VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath);
vHostConfig.performBindings();
@@ -294,11 +311,12 @@ public class Main
String[] fileNames = virtualHostDir.list();
- for (int each=0; each < fileNames.length; each++)
+ for (int each = 0; each < fileNames.length; each++)
{
if (fileNames[each].endsWith(".xml"))
{
- VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]);
+ VirtualHostConfiguration vHostConfig =
+ new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]);
vHostConfig.performBindings();
}
}
@@ -315,7 +333,7 @@ public class Main
try
{
- //IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
+ // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
IoAcceptor acceptor = connectorConfig.createAcceptor();
SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
@@ -330,7 +348,7 @@ public class Main
{
sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
-
+
if (!connectorConfig.enableSSL || !connectorConfig.sslOnly)
{
AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
@@ -343,6 +361,7 @@ public class Main
{
bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
}
+
acceptor.bind(bindAddress, handler, sconfig);
_logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
}
@@ -352,8 +371,7 @@ public class Main
AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
try
{
- acceptor.bind(new InetSocketAddress(connectorConfig.sslPort),
- handler, sconfig);
+ acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
_logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
}
catch (IOException e)
@@ -391,10 +409,12 @@ public class Main
throw new Exception("Error parsing IP address: " + address, e);
}
}
+
if (index != 4)
{
throw new Exception("Invalid IP address: " + address);
}
+
return ip;
}
@@ -407,16 +427,17 @@ public class Main
}
catch (NumberFormatException e)
{
- System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " +
- "a non-negative integer. Using default of zero (no watching configured");
+ System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+ + "a non-negative integer. Using default of zero (no watching configured");
}
+
if (logConfigFile.exists() && logConfigFile.canRead())
{
System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
if (logWatchTime > 0)
{
- System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " +
- logWatchTime + " seconds");
+ System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+ + logWatchTime + " seconds");
// log4j expects the watch interval in milliseconds
DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
index cf0253b4be..2ede1e49d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,36 +20,37 @@
*/
package org.apache.qpid.server.management;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.util.Map;
-import java.util.HashMap;
+import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
-import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
-import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
-import javax.management.remote.JMXServiceURL;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
import javax.management.remote.MBeanServerForwarder;
+import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AccountNotFoundException;
import javax.security.sasl.AuthorizeCallback;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
/**
* This class starts up an MBeanserver. If out of the box agent is being used then there are no security features
@@ -76,13 +77,15 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
boolean security = appRegistry.getConfiguration().getBoolean("management.security-enabled", true);
int port = appRegistry.getConfiguration().getInt("management.jmxport", 8999);
- _mbeanServer = platformServer ? ManagementFactory.getPlatformMBeanServer()
- : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN);
+ _mbeanServer =
+ platformServer ? ManagementFactory.getPlatformMBeanServer()
+ : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN);
// Check if the "QPID_OPTS" is set to use Out of the Box JMXAgent
if (areOutOfTheBoxJMXOptionsSet())
{
_log.info("JMX: Using the out of the box JMX Agent");
+
return;
}
@@ -95,18 +98,18 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
Map env = new HashMap();
env.put("jmx.remote.profiles", "SASL/PLAIN");
- //env.put("jmx.remote.profiles", "SASL/CRAM-MD5");
+ // env.put("jmx.remote.profiles", "SASL/CRAM-MD5");
Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases();
Map.Entry<String, PrincipalDatabase> entry = map.entrySet().iterator().next();
-
+
// Callback handler used by the PLAIN SASL server mechanism to perform user authentication
/*
PlainInitialiser plainInitialiser = new PlainInitialiser();
plainInitialiser.initialise(entry.getValue());
env.put("jmx.remote.sasl.callback.handler", plainInitialiser.getCallbackHandler());
- */
-
+ */
+
env.put("jmx.remote.sasl.callback.handler", new UserCallbackHandler(entry.getValue()));
// Enable the SSL security and server authentication
@@ -115,7 +118,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
SslRMIServerSocketFactory ssf = new SslRMIServerSocketFactory();
env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, csf);
env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, ssf);
- */
+ */
try
{
@@ -162,7 +165,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
public void registerObject(ManagedObject managedObject) throws JMException
{
- _mbeanServer.registerMBean(managedObject, managedObject.getObjectName());
+ _mbeanServer.registerMBean(managedObject, managedObject.getObjectName());
}
public void unregisterObject(ManagedObject managedObject) throws JMException
@@ -180,9 +183,10 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
{
return true;
}
+
if (System.getProperty("com.sun.management.jmxremote.port") != null)
{
- return true;
+ return true;
}
return false;
@@ -248,21 +252,24 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
boolean authorized = false;
// Process retrieval of password; can get password if username is available in NameCallback
- if (ncb != null && pcb != null)
+ if ((ncb != null) && (pcb != null))
{
String username = ncb.getDefaultName();
try
{
- authorized =_principalDatabase.verifyPassword(new UsernamePrincipal(username), pcb.getPassword());
+ authorized = _principalDatabase.verifyPassword(new UsernamePrincipal(username), pcb.getPassword());
}
catch (AccountNotFoundException e)
{
- throw new IOException("User not authorized. " + e);
+ IOException ioe = new IOException("User not authorized. " + e);
+ ioe.initCause(e);
+ throw ioe;
}
}
+
if (!authorized)
{
- throw new IOException("User not authorized.");
+ throw new IOException("User not authorized.");
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 7a32848c44..e7e7f5c22f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -347,7 +347,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
catch (AMQException e)
{
- throw new JMException("Error creating header attributes list: " + e);
+ JMException jme = new JMException("Error creating header attributes list: " + e);
+ jme.initCause(e);
+ throw jme;
}
}
@@ -381,7 +383,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
catch (AMQException e)
{
- throw new JMException("Error creating message contents: " + e);
+ JMException jme = new JMException("Error creating message contents: " + e);
+ jme.initCause(e);
+ throw jme;
}
return _messageList;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 8806ac0516..89f0b7b39d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -153,7 +153,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
_logger.error("Error configuring application: " + e, e);
//throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID);
- throw new RuntimeException("Unable to create Application Registry");
+ throw new RuntimeException("Unable to create Application Registry", e);
}
}
else
@@ -193,7 +193,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
catch (Exception e)
{
_logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
}
Configurator.configure(instance);
_configuredObjects.put(instanceType, instance);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java
index 1ccb13cf62..35d036d20f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java
@@ -116,7 +116,9 @@ public class AccessManagerImpl implements AccessManager
}
catch (Exception e)
{
- throw new ConfigurationException(e.getMessage(), e.getCause());
+ ConfigurationException ce = new ConfigurationException(e.getMessage(), e.getCause());
+ ce.initCause(e);
+ throw ce;
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
index 2e019e1cfb..8d2bf6d5ab 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
@@ -1,38 +1,40 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.security.auth.database;
+import java.io.FileNotFoundException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.configuration.PropertyUtils;
+
import org.apache.log4j.Logger;
-import java.util.Map;
-import java.util.List;
-import java.util.HashMap;
-import java.lang.reflect.Method;
-import java.io.FileNotFoundException;
+import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatabaseManager
{
@@ -80,23 +82,26 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
initialisePrincipalDatabase((PrincipalDatabase) o, config, i);
String name = databaseNames.get(i);
- if (name == null || name.length() == 0)
+ if ((name == null) || (name.length() == 0))
{
throw new Exception("Principal database names must have length greater than or equal to one character");
}
+
PrincipalDatabase pd = databases.get(name);
if (pd != null)
{
throw new Exception("Duplicate principal database name not provided");
}
+
_logger.info("Initialised principal database '" + name + "' successfully");
databases.put(name, (PrincipalDatabase) o);
}
+
return databases;
}
private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, Configuration config, int index)
- throws FileNotFoundException, ConfigurationException
+ throws FileNotFoundException, ConfigurationException
{
String baseName = _base + "(" + index + ").attributes.attribute.";
List<String> argumentNames = config.getList(baseName + "name");
@@ -104,14 +109,16 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
for (int i = 0; i < argumentNames.size(); i++)
{
String argName = argumentNames.get(i);
- if (argName == null || argName.length() == 0)
+ if ((argName == null) || (argName.length() == 0))
{
throw new ConfigurationException("Argument names must have length >= 1 character");
}
+
if (Character.isLowerCase(argName.charAt(0)))
{
argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
}
+
String methodName = "set" + argName;
Method method = null;
try
@@ -125,9 +132,10 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
if (method == null)
{
- throw new ConfigurationException("No method " + methodName + " found in class " + principalDatabase.getClass() +
- " hence unable to configure principal database. The method must be public and " +
- "have a single String argument with a void return type");
+ throw new ConfigurationException("No method " + methodName + " found in class "
+ + principalDatabase.getClass()
+ + " hence unable to configure principal database. The method must be public and "
+ + "have a single String argument with a void return type");
}
try
@@ -138,11 +146,11 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
{
if (ite instanceof ConfigurationException)
{
- throw(ConfigurationException) ite;
+ throw (ConfigurationException) ite;
}
else
{
- throw new ConfigurationException(ite.getMessage(), ite.getCause());
+ throw new ConfigurationException(ite.getMessage(), ite);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java
index 68095de3a0..dd0bd096c3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,14 +33,16 @@ import javax.security.auth.login.AccountNotFoundException;
import javax.security.sasl.AuthorizeCallback;
import org.apache.commons.configuration.Configuration;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
public abstract class UsernamePasswordInitialiser implements AuthenticationProviderInitialiser
{
- protected static final Logger _logger = Logger.getLogger(UsernamePasswordInitialiser.class);
+ protected static final Logger _logger = Logger.getLogger(UsernamePasswordInitialiser.class);
private ServerCallbackHandler _callbackHandler;
@@ -72,7 +74,9 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi
{
// very annoyingly the callback handler does not throw anything more appropriate than
// IOException
- throw new IOException("Error looking up user " + e);
+ IOException ioe = new IOException("Error looking up user " + e);
+ ioe.initCause(e);
+ throw ioe;
}
}
else if (callback instanceof AuthorizeCallback)
@@ -88,7 +92,7 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi
}
public void initialise(String baseConfigPath, Configuration configuration,
- Map<String, PrincipalDatabase> principalDatabases) throws Exception
+ Map<String, PrincipalDatabase> principalDatabases) throws Exception
{
String principalDatabaseName = configuration.getString(baseConfigPath + ".principal-database");
PrincipalDatabase db = principalDatabases.get(principalDatabaseName);
@@ -102,6 +106,7 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi
{
throw new NullPointerException("Cannot initialise with a null Principal database.");
}
+
_callbackHandler = new ServerCallbackHandler(db);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index c24d1aa23a..b5c59dbbb7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -181,7 +181,7 @@ public class VirtualHost implements Accessable
catch (Exception e)
{
_logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
}
Configurator.configure(instance);
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
index 88bcbbbccb..f3b21e3c64 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
@@ -47,7 +47,9 @@ public class FileMessageFactory
}
catch (IOException e)
{
- throw new MessageFactoryException(e.toString());
+ MessageFactoryException mfe = new MessageFactoryException(e.toString());
+ mfe.initCause(e);
+ throw mfe;
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
index 8505d1d457..98a2c0d497 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
@@ -59,11 +59,11 @@ public class InitialContextHelper
}
catch (IOException e)
{
- throw new ContextException(e.toString());
+ throw new ContextException(e.toString(), e);
}
catch (NamingException n)
{
- throw new ContextException(n.toString());
+ throw new ContextException(n.toString(), n);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 2274f2964f..0e3d99eeba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,29 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
@@ -44,28 +67,6 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.log4j.Logger;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
@@ -95,7 +96,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+ private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
private String _clientName;
@@ -125,15 +126,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/*
* _Connected should be refactored with a suitable wait object.
- */
+ */
private boolean _connected;
/*
* The last error code that occured on the connection. Used to return the correct exception to the client
- */
+ */
private AMQException _lastAMQException = null;
-
/*
* The connection meta data
*/
@@ -161,13 +161,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @throws AMQException
* @throws URLSyntaxException
*/
- public AMQConnection(String broker, String username, String password,
- String clientName, String virtualHost) throws AMQException, URLSyntaxException
+ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
+ throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) + "/" +
- virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null);
+ this(new AMQConnectionURL(
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ + AMQBrokerDetails.checkTransport(broker) + "'"), null);
}
/**
@@ -180,44 +180,38 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @throws AMQException
* @throws URLSyntaxException
*/
- public AMQConnection(String broker, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost,
+ SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) + "/" +
- virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
+ this(new AMQConnectionURL(
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
}
-
- public AMQConnection(String host, int port, String username, String password,
- String clientName, String virtualHost) throws AMQException, URLSyntaxException
+ public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
+ throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, null);
}
- public AMQConnection(String host, int port, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost,
+ SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, sslConfig);
}
-
- public AMQConnection(String host, int port, boolean useSSL, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName,
+ String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(useSSL ?
- ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) +
- virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
- + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
- ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) +
- virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
- + "," + ConnectionURL.OPTIONS_SSL + "='false'"
- ), sslConfig);
+ this(new AMQConnectionURL(
+ useSSL
+ ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
+ : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
@@ -230,13 +224,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(connection), sslConfig);
}
-
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
}
+
_sslConfiguration = sslConfig;
if (connectionURL == null)
{
@@ -250,7 +244,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_password = connectionURL.getPassword();
setVirtualHost(connectionURL.getVirtualHost());
-
if (connectionURL.getDefaultQueueExchangeName() != null)
{
_defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
@@ -271,7 +264,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
}
-
_failoverPolicy = new FailoverPolicy(connectionURL);
_protocolHandler = new AMQProtocolHandler(this);
@@ -279,7 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// We are not currently connected
_connected = false;
-
Exception lastException = new Exception();
lastException.initCause(new ConnectException());
@@ -297,7 +288,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (_logger.isInfoEnabled())
{
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
+ e.getCause());
}
}
}
@@ -323,7 +315,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- if (message == null || message.equals(""))
+ if ((message == null) || message.equals(""))
{
message = "Unable to Connect";
}
@@ -336,11 +328,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString());
}
+
e.initCause(lastException);
}
throw e;
}
+
_connectionMetaData = new QpidConnectionMetaData(this);
}
@@ -370,6 +364,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
virtualHost = virtualHost.substring(1);
}
+
_virtualHost = virtualHost;
}
@@ -383,7 +378,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
_failoverPolicy.attainedConnection();
- //Again this should be changed to a suitable notify
+ // Again this should be changed to a suitable notify
_connected = true;
}
catch (AMQException e)
@@ -402,6 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
makeBrokerConnection(bd);
+
return true;
}
catch (Exception e)
@@ -410,8 +406,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Unable to connect to broker at " + bd);
}
+
attemptReconnection();
}
+
return false;
}
@@ -422,6 +420,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
+
return true;
}
catch (Exception e)
@@ -437,13 +436,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (_logger.isInfoEnabled())
{
- _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+ _logger.info(e.getMessage() + ":Unable to connect to broker at "
+ + _failoverPolicy.getCurrentBrokerDetails());
}
}
}
}
- //connection unsuccessful
+ // connection unsuccessful
return false;
}
@@ -475,14 +475,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
}
- public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetch) throws JMSException
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
+ throws JMSException
{
return createSession(transacted, acknowledgeMode, prefetch, prefetch);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetchHigh, final int prefetchLow) throws JMSException
+ final int prefetchHigh, final int prefetchLow) throws JMSException
{
checkNotClosed();
if (channelLimitReached())
@@ -492,85 +492,81 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
else
{
return (org.apache.qpid.jms.Session) new FailoverSupport()
- {
- public Object operation() throws JMSException
- {
- int channelId = _idFactory.incrementAndGet();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Write channel open frame for channel id " + channelId);
- }
-
- // We must create the session and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode,
- prefetchHigh, prefetchLow);
- _protocolHandler.addSessionByChannel(channelId, session);
- registerSession(channelId, session);
-
- boolean success = false;
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- success = true;
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error creating session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- if (!success)
- {
- _protocolHandler.removeSessionByChannel(channelId);
- deregisterSession(channelId);
- }
- }
-
- if (_started)
{
- try
- {
- session.start();
- }
- catch (AMQException e)
+ public Object operation() throws JMSException
{
- throw new JMSAMQException(e);
+ int channelId = _idFactory.incrementAndGet();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Write channel open frame for channel id " + channelId);
+ }
+
+ // We must create the session and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AMQSession session =
+ new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+ prefetchLow);
+ _protocolHandler.addSessionByChannel(channelId, session);
+ registerSession(channelId, session);
+
+ boolean success = false;
+ try
+ {
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ success = true;
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error creating session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ finally
+ {
+ if (!success)
+ {
+ _protocolHandler.removeSessionByChannel(channelId);
+ deregisterSession(channelId);
+ }
+ }
+
+ if (_started)
+ {
+ try
+ {
+ session.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ return session;
}
- }
- return session;
- }
- }.execute(this);
+ }.execute(this);
}
}
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException
+ throws AMQException
{
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- null), // outOfBand
- ChannelOpenOkBody.class);
-
- //todo send low water mark when protocol allows.
- //todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
+ _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), null), // outOfBand
+ ChannelOpenOkBody.class);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
+ BasicQosOkBody.class);
if (transacted)
{
@@ -580,10 +576,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()),
- TxSelectOkBody.class);
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
}
}
@@ -597,11 +591,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
- throw new AMQException("Error reopening channel " + channelId + " after failover: " + e);
+ throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e);
}
}
-
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
@@ -646,12 +639,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private boolean channelLimitReached()
{
- return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount;
+ return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
}
public String getClientID() throws JMSException
{
checkNotClosed();
+
return _clientName;
}
@@ -667,6 +661,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
+
return _connectionMetaData;
}
@@ -674,6 +669,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ExceptionListener getExceptionListener() throws JMSException
{
checkNotClosed();
+
return _exceptionListener;
}
@@ -707,6 +703,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new JMSAMQException(e);
}
}
+
_started = true;
}
}
@@ -727,6 +724,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new JMSAMQException(e);
}
}
+
_started = false;
}
}
@@ -753,7 +751,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- //adjust timeout
+ // adjust timeout
long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
_taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
@@ -764,7 +762,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- //adjust timeout
+ // adjust timeout
timeout = adjustTimeout(timeout, startCloseTime);
_protocolHandler.closeConnection(timeout);
@@ -772,7 +770,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
catch (AMQException e)
{
- throw new JMSException("Error closing connection: " + e);
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
}
@@ -786,6 +786,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
timeout = 0;
}
+
return timeout;
}
@@ -804,6 +805,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
session.markClosed();
}
+
_sessions.clear();
}
@@ -843,6 +845,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
}
+
_sessions.clear();
if (sessionException != null)
{
@@ -851,42 +854,42 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
checkNotClosed();
+
return null;
}
- public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
{
checkNotClosed();
+
return null;
}
- public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
{
checkNotClosed();
+
return null;
}
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
- String messageSelector, ServerSessionPool sessionPool,
- int maxMessages)
- throws JMSException
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
// TODO Auto-generated method stub
checkNotClosed();
+
return null;
}
public long getMaximumChannelCount() throws JMSException
{
checkNotClosed();
+
return _maximumChannelCount;
}
@@ -975,6 +978,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
proceed = _connectionListener.preFailover(redirect);
}
+
return proceed;
}
@@ -995,6 +999,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
markAllSessionsClosed();
}
+
return resubscribe;
}
else
@@ -1058,12 +1063,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (cause instanceof AMQException)
{
- je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), "Exception thrown against " + toString() + ": " + cause);
+ je =
+ new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()),
+ "Exception thrown against " + toString() + ": " + cause);
}
else
{
je = new JMSException("Exception thrown against " + toString() + ": " + cause);
}
+
if (cause instanceof Exception)
{
je.setLinkedException((Exception) cause);
@@ -1091,6 +1099,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Closing AMQConnection due to :" + cause.getMessage());
}
+
_closed.set(true);
closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
@@ -1146,9 +1155,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
buf.append("Host: ").append(String.valueOf(bd.getHost()));
buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
}
+
buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
buf.append("\nClient ID: ").append(String.valueOf(_clientName));
- buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size());
+ buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size());
+
return buf.toString();
}
@@ -1159,11 +1170,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public Reference getReference() throws NamingException
{
- return new Reference(
- AMQConnection.class.getName(),
- new StringRefAddr(AMQConnection.class.getName(), toURL()),
- AMQConnectionFactory.class.getName(),
- null); // factory location
+ return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()),
+ AMQConnectionFactory.class.getName(), null); // factory location
}
public SSLConfiguration getSSLConfiguration()
@@ -1176,19 +1184,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _defaultTopicExchangeName;
}
-
public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
{
_defaultTopicExchangeName = defaultTopicExchangeName;
}
-
public AMQShortString getDefaultQueueExchangeName()
{
return _defaultQueueExchangeName;
}
-
public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
{
_defaultQueueExchangeName = defaultQueueExchangeName;
@@ -1201,10 +1206,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQShortString getTemporaryQueueExchangeName()
{
- return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates.
+ return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates.
}
-
public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
{
_temporaryTopicExchangeName = temporaryTopicExchangeName;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 73010ce517..38c1cd8205 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.client;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -34,6 +34,7 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
@@ -138,10 +139,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
- protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
- boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
- int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -160,7 +161,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_autoClose = autoClose;
_noConsume = noConsume;
- //Force queue browsers not to use acknowledge modes.
+ // Force queue browsers not to use acknowledge modes.
if (_noConsume)
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
@@ -175,12 +176,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public String getMessageSelector() throws JMSException
{
checkPreConditions();
+
return _messageSelector;
}
public MessageListener getMessageListener() throws JMSException
{
checkPreConditions();
+
return _messageListener.get();
}
@@ -198,14 +201,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
checkPreConditions();
- //if the current listener is non-null and the session is not stopped, then
- //it is an error to call this method.
+ // if the current listener is non-null and the session is not stopped, then
+ // it is an error to call this method.
- //i.e. it is only valid to call this method if
+ // i.e. it is only valid to call this method if
//
- // (a) the connection is stopped, in which case the dispatcher is not running
- // OR
- // (b) the listener is null AND we are not receiving synchronously at present
+ // (a) the connection is stopped, in which case the dispatcher is not running
+ // OR
+ // (b) the listener is null AND we are not receiving synchronously at present
//
if (!_session.getAMQConnection().started())
@@ -215,7 +218,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
- _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
+ _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
+ + _destination);
}
}
else
@@ -224,6 +228,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
}
+
if (!_messageListener.compareAndSet(null, messageListener))
{
throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
@@ -233,7 +238,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (messageListener != null)
{
- //handle case where connection has already been started, and the dispatcher has alreaded started
+ // handle case where connection has already been started, and the dispatcher has alreaded started
// putting values on the _synchronousQueue
synchronized (_session)
@@ -263,10 +268,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
throw new javax.jms.IllegalStateException("Another thread is already receiving.");
}
+
if (isMessageListenerSet())
{
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
+
_receivingThread = Thread.currentThread();
}
@@ -331,6 +338,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
return null;
}
+
Object o = null;
if (l > 0)
{
@@ -340,6 +348,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
o = _synchronousQueue.take();
}
+
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
@@ -352,6 +361,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
catch (InterruptedException e)
{
_logger.warn("Interrupted: " + e);
+
return null;
}
finally
@@ -365,6 +375,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
{
close(false);
+
return true;
}
else
@@ -387,6 +398,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
return null;
}
+
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -414,8 +426,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
* JMSException is created with the linked exception set appropriately
*/
- private AbstractJMSMessage returnMessageOrThrow(Object o)
- throws JMSException
+ private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException
{
// errors are passed via the queue too since there is no way of interrupting the poll() via the API.
if (o instanceof Throwable)
@@ -425,6 +436,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
e.setLinkedException((Exception) o);
}
+
throw e;
}
else
@@ -433,7 +445,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
-
public void close() throws JMSException
{
close(true);
@@ -441,7 +452,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
- //synchronized (_closed)
+ // synchronized (_closed)
if (_logger.isInfoEnabled())
{
@@ -456,7 +467,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.trace(_consumerTag + " close():"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -464,14 +476,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
}
}
+
if (sendClose)
{
// TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- _consumerTag, // consumerTag
- false); // nowait
+ final AMQFrame cancelFrame =
+ BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+ false); // nowait
try
{
@@ -485,25 +497,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (AMQException e)
{
- _logger.error("Error closing consumer: " + e, e);
- throw new JMSException("Error closing consumer: " + e);
+ // _logger.error("Error closing consumer: " + e, e);
+ JMSException jmse = new JMSException("Error closing consumer: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
else
{
-// //fixme this probably is not right
-// if (!isNoConsume())
- { //done in BasicCancelOK Handler but not sending one so just deregister.
+ // //fixme this probably is not right
+ // if (!isNoConsume())
+ { // done in BasicCancelOK Handler but not sending one so just deregister.
deregisterConsumer();
}
}
- if (_messageListener != null && _receiving.get())
+ if ((_messageListener != null) && _receiving.get())
{
if (_logger.isInfoEnabled())
{
_logger.info("Interrupting thread: " + _receivingThread);
}
+
_receivingThread.interrupt();
}
}
@@ -516,7 +531,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
void markClosed()
{
-// synchronized (_closed)
+ // synchronized (_closed)
{
_closed.set(true);
@@ -524,7 +539,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " markClosed():"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -533,6 +549,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
}
+
deregisterConsumer();
}
@@ -551,22 +568,22 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
}
+
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered,
- messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey,
- messageFrame.getContentHeader(),
- messageFrame.getBodies());
+ AbstractJMSMessage jmsMessage =
+ _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+ messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
+ messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
if (debug)
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
-// synchronized (_closed)
+ // synchronized (_closed)
+
{
-// if (!_closed.get())
+ // if (!_closed.get())
{
jmsMessage.setConsumer(this);
@@ -575,12 +592,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
notifyMessage(jmsMessage, channelId);
}
-// else
-// {
-// _logger.error("MESSAGE REJECTING!");
-// _session.rejectMessage(jmsMessage, true);
-// //_logger.error("MESSAGE JUST DROPPED!");
-// }
+ // else
+ // {
+ // _logger.error("MESSAGE REJECTING!");
+ // _session.rejectMessage(jmsMessage, true);
+ // //_logger.error("MESSAGE JUST DROPPED!");
+ // }
}
}
catch (Exception e)
@@ -606,11 +623,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (isMessageListenerSet())
{
- //we do not need a lock around the test above, and the dispatch below as it is invalid
- //for an application to alter an installed listener while the session is started
-// synchronized (_closed)
+ // we do not need a lock around the test above, and the dispatch below as it is invalid
+ // for an application to alter an installed listener while the session is started
+ // synchronized (_closed)
{
-// if (!_closed.get())
+ // if (!_closed.get())
{
preApplicationProcessing(jmsMessage);
@@ -641,14 +658,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
switch (_acknowledgeMode)
{
- case Session.PRE_ACKNOWLEDGE:
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- break;
- case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
- break;
+
+ case Session.PRE_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
+
+ case Session.CLIENT_ACKNOWLEDGE:
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ break;
}
}
@@ -657,47 +676,56 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- break;
- case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetchHigh)
- {
- _dups_ok_acknowledge_send = true;
- }
- if (_outstanding <= _prefetchLow)
- {
- _dups_ok_acknowledge_send = false;
- }
- if (_dups_ok_acknowledge_send)
- {
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
- }
- }
- break;
- case Session.AUTO_ACKNOWLEDGE:
- // we do not auto ack a message if the application code called recover()
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+
+ break;
+
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
+
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
+
+ if (_dups_ok_acknowledge_send)
+ {
if (!_session.isInRecovery())
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
}
- break;
- case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _receivedDeliveryTags.add(msg.getDeliveryTag());
- }
- break;
+ }
+
+ break;
+
+ case Session.AUTO_ACKNOWLEDGE:
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+
+ break;
+
+ case Session.SESSION_TRANSACTED:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
+ }
+
+ break;
}
}
@@ -721,14 +749,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
void notifyError(Throwable cause)
{
-// synchronized (_closed)
+ // synchronized (_closed)
{
_closed.set(true);
if (_logger.isTraceEnabled())
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " notifyError():"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously" + _closedStack.toString());
}
else
@@ -737,7 +766,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
}
- //QPID-293 can "request redelivery of this error through dispatcher"
+ // QPID-293 can "request redelivery of this error through dispatcher"
// we have no way of propagating the exception to a message listener - a JMS limitation - so we
// deal with the case where we have a synchronous receive() waiting for a message to arrive
@@ -749,10 +778,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_logger.debug("Passed exception to synchronous queue for propagation to receive()");
}
}
+
deregisterConsumer();
}
-
/**
* Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in
* the case of an error occurring.
@@ -782,7 +811,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
this.checkNotClosed();
- if (_session == null || _session.isClosed())
+ if ((_session == null) || _session.isClosed())
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
@@ -817,7 +846,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _autoClose;
}
-
public boolean isNoConsume()
{
return _noConsume;
@@ -827,10 +855,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_closeWhenNoMessages = b;
- if (_closeWhenNoMessages
- && _synchronousQueue.isEmpty()
- && _receiving.get()
- && _messageListener != null)
+ if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
{
_receivingThread.interrupt();
}
@@ -846,13 +871,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_logger.debug("Rejecting received messages in _receivedDTs (RQ)");
}
- //rollback received but not committed messages
+ // rollback received but not committed messages
while (!_receivedDeliveryTags.isEmpty())
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" +
- "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
+ + "for consumer with tag:" + _consumerTag);
}
Long tag = _receivedDeliveryTags.poll();
@@ -876,14 +901,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- //rollback pending messages
+ // rollback pending messages
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" +
- "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
+ + "for consumer with tag:" + _consumerTag);
}
+
Iterator iterator = _synchronousQueue.iterator();
while (iterator.hasNext())
@@ -898,13 +924,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
}
+
iterator.remove();
}
else
{
- _logger.error("Queue contained a :" + o.getClass() +
- " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ _logger.error("Queue contained a :" + o.getClass()
+ + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
}
}
@@ -919,7 +946,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
-
public String debugIdentity()
{
return String.valueOf(_consumerTag);
diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
index d2ab6bd2c2..d1237cff49 100644
--- a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
@@ -1,5 +1,25 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,10 +42,35 @@ import javax.jms.JMSException;
import org.apache.qpid.AMQException;
/**
+ * JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old
+ * Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions
+ * as well as error messages, through its constructor, but is a JMSException.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept wrapped exceptions as a JMSException.
+ * </table>
+ *
* @author Apache Software Foundation
*/
public class JMSAMQException extends JMSException
{
+ /**
+ * Creates a JMSException, wrapping another exception class.
+ *
+ * @param message The error message.
+ * @param cause The underlying exception that caused this one. May be null if none is to be set.
+ */
+ public JMSAMQException(String message, Exception cause)
+ {
+ super(message);
+
+ if (cause != null)
+ {
+ setLinkedException(cause);
+ }
+ }
+
public JMSAMQException(AMQException s)
{
super(s.getMessage(), String.valueOf(s.getErrorCode()));
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index 8938130417..af254fbbaf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -27,6 +27,7 @@ import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -72,7 +73,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
}
AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
@@ -93,7 +94,9 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
}
catch (IOException e)
{
- throw new JMSException(e.toString());
+ JMSException jmse = new JMSException(e.toString());
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
@@ -112,6 +115,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
{
return null;
}
+
int pos = _data.position();
_data.rewind();
// one byte left is for the end of frame marker
@@ -119,12 +123,14 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
{
// this is really redundant since pos must be zero
_data.position(pos);
+
return null;
}
else
{
String data = _data.getString(Charset.forName("UTF8").newDecoder());
_data.position(pos);
+
return data;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 41a143c544..f87b4027f6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -33,12 +33,7 @@ import javax.jms.MessageNotWriteableException;
import org.apache.commons.collections.map.ReferenceMap;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
-import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.*;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -184,7 +179,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
catch (URLSyntaxException e)
{
- throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding);
+ throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
}
_destinationCache.put(replyToEncoding, dest);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
index 6352f7029f..348a0bd152 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
@@ -384,7 +384,9 @@ public final class JMSHeaderAdapter
}
catch (AMQPInvalidClassException aice)
{
- throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ mfe.setLinkedException(aice);
+ throw mfe;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index df1400b167..caf8741280 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -33,6 +33,7 @@ import javax.jms.MessageFormatException;
import javax.jms.ObjectMessage;
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -61,14 +62,15 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
_data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
_data.setAutoExpand(true);
}
+
getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
}
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ ByteBuffer data) throws AMQException
{
super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
}
@@ -79,6 +81,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
_data.release();
}
+
_data = null;
}
@@ -116,11 +119,13 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
catch (IOException e)
{
- throw new MessageFormatException("Message not serializable: " + e);
+ MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
}
}
-
+
public Serializable getObject() throws JMSException
{
ObjectInputStream in = null;
@@ -133,17 +138,20 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
_data.rewind();
in = new ObjectInputStream(_data.asInputStream());
+
return (Serializable) in.readObject();
}
catch (IOException e)
{
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
}
catch (ClassNotFoundException e)
{
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
}
finally
{
@@ -162,8 +170,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
}
catch (IOException ignore)
- {
- }
+ { }
}
private static String toString(ByteBuffer data)
@@ -172,6 +179,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
return null;
}
+
int pos = data.position();
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java b/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
index 4504498308..dcca55e6c2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
@@ -31,15 +31,16 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.RealmCallback;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.log4j.Logger;
import com.sun.crypto.provider.HmacMD5;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+
public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler
{
private static final Logger _logger = Logger.getLogger(UsernameHashedPasswordCallbackHandler.class);
-
private AMQProtocolSession _protocolSession;
public void initialise(AMQProtocolSession protocolSession)
@@ -58,14 +59,15 @@ public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler
}
else if (cb instanceof PasswordCallback)
{
-
try
{
((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword()));
}
- catch (Exception e)
+ catch (NoSuchAlgorithmException e)
{
- throw new UnsupportedCallbackException(cb);
+ UnsupportedCallbackException uce = new UnsupportedCallbackException(cb);
+ uce.initCause(e);
+ throw uce;
}
}
else
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
index 104c5bfc44..1ec3adc2eb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,7 +33,7 @@ public class AMQNoTransportForProtocolException extends AMQTransportConnectionEx
public AMQNoTransportForProtocolException(BrokerDetails details, String message)
{
- super(message);
+ super(null, message, null);
_details = details;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
index 4b17661bc3..fec7ff693c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,12 +21,12 @@
package org.apache.qpid.client.transport;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQTransportConnectionException extends AMQException
{
- public AMQTransportConnectionException(String message)
+ public AMQTransportConnectionException(AMQConstant errorCode, String message, Throwable cause)
{
- super(message);
-
+ super(errorCode, message, cause);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 8368eee125..0bc83e9804 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,12 +26,14 @@ import java.util.Iterator;
import java.util.Map;
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
import org.apache.qpid.client.AMQBrokerDetails;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
@@ -64,13 +66,11 @@ public class TransportConnection
int transport = getTransport(details.getTransport());
if (transport == -1)
-
{
throw new AMQNoTransportForProtocolException(details);
}
if (transport == _currentInstance)
-
{
if (transport == VM)
{
@@ -88,40 +88,42 @@ public class TransportConnection
_currentInstance = transport;
switch (transport)
-
{
- case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
+
+ case TCP:
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
- SocketConnector result;
- //FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
+ public IoConnector newSocketConnector()
{
- _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set.");
-// result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
+ SocketConnector result;
+ // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
+ if (Boolean.getBoolean("qpidnio"))
+ {
+ _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set.");
+ // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
+ }
+ // else
+
+ {
+ _logger.info("Using Mina NIO");
+ result = new SocketConnector(); // non-blocking connector
+ }
+
+ // Don't have the connector's worker thread wait around for other connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ result.setWorkerTimeout(0);
+
+ return result;
}
-// else
- {
- _logger.info("Using Mina NIO");
- result = new SocketConnector(); // non-blocking connector
- }
-
- // Don't have the connector's worker thread wait around for other connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- result.setWorkerTimeout(0);
+ });
+ break;
- return result;
- }
- });
- break;
- case VM:
- {
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
- }
+ case VM:
+ {
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ break;
+ }
}
return _instance;
@@ -142,7 +144,8 @@ public class TransportConnection
return -1;
}
- private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException
+ private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
+ throws AMQVMBrokerCreationException
{
int port = details.getPort();
@@ -154,14 +157,14 @@ public class TransportConnection
}
else
{
- throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
+ throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+ + " does not exist. Auto create disabled.", null);
}
}
return new VmPipeTransportConnection(port);
}
-
public static void createVMBroker(int port) throws AMQVMBrokerCreationException
{
if (_acceptor == null)
@@ -192,7 +195,7 @@ public class TransportConnection
{
_logger.error(e);
- //Try and unbind provider
+ // Try and unbind provider
try
{
VmPipeAddress pipe = new VmPipeAddress(port);
@@ -203,7 +206,7 @@ public class TransportConnection
}
catch (Exception ignore)
{
- //ignore
+ // ignore
}
if (provider == null)
@@ -227,7 +230,7 @@ public class TransportConnection
because = e.getCause().toString();
}
- throw new AMQVMBrokerCreationException(port, because + " Stopped binding of InVM Qpid.AMQP");
+ throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
}
}
}
@@ -246,14 +249,14 @@ public class TransportConnection
// can't use introspection to get Provider as it is a server class.
// need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
- //get right constructor and pass in instancec ID - "port"
+ // get right constructor and pass in instancec ID - "port"
IoHandlerAdapter provider;
try
{
- Class[] cnstr = {Integer.class};
- Object[] params = {port};
+ Class[] cnstr = { Integer.class };
+ Object[] params = { port };
provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
- //Give the broker a second to create
+ // Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
}
catch (Exception e)
@@ -270,8 +273,10 @@ public class TransportConnection
because = e.getCause().toString();
}
-
- throw new AMQVMBrokerCreationException(port, because + " Stopped InVM Qpid.AMQP creation");
+ AMQVMBrokerCreationException amqbce =
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
+ amqbce.initCause(e);
+ throw amqbce;
}
return provider;
diff --git a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
index 607ddcc26a..4b2982fe9c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,19 +21,25 @@
package org.apache.qpid.client.vmbroker;
import org.apache.qpid.client.transport.AMQTransportConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQVMBrokerCreationException extends AMQTransportConnectionException
{
private int _port;
+ /**
+ * @param port
+ *
+ * @deprecated
+ */
public AMQVMBrokerCreationException(int port)
{
- this(port, "Unable to create vm broker");
+ this(null, port, "Unable to create vm broker", null);
}
- public AMQVMBrokerCreationException(int port, String message)
+ public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause)
{
- super(message);
+ super(errorCode, message, cause);
_port = port;
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
index 9adf04e182..6ad3fb4bae 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
@@ -101,7 +101,7 @@ public class FailoverPolicy
}
catch (Exception cnfe)
{
- throw new IllegalArgumentException("Unknown failover method:" + failoverMethod);
+ throw new IllegalArgumentException("Unknown failover method:" + failoverMethod, cnfe);
}
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/java/client/src/test/java/org/apache/qpid/testutil/Config.java
index 8109d20a33..b777cf93b6 100644
--- a/java/client/src/test/java/org/apache/qpid/testutil/Config.java
+++ b/java/client/src/test/java/org/apache/qpid/testutil/Config.java
@@ -172,7 +172,7 @@ public class Config
}
catch(NumberFormatException e)
{
- throw new RuntimeException("Bad port number: " + value);
+ throw new RuntimeException("Bad port number: " + value, e);
}
}
else if("-name".equalsIgnoreCase(key))
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
index f2afa472ab..195ed79dab 100644
--- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -3,6 +3,7 @@ package org.apache.qpid.testutil;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.log4j.Logger;
@@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener
}
catch (URLSyntaxException e)
{
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
}
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java
index 32c1e76a39..2f04a01f53 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQException.java
@@ -22,11 +22,41 @@ package org.apache.qpid;
import org.apache.qpid.protocol.AMQConstant;
-/** Generic AMQ exception. */
+/**
+ * AMQException forms the root exception of all exceptions relating to the AMQ protocol. It provides space to associate
+ * an AMQ error code with the exception, which is a numberic value, with a meaning defined by the protocol.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents an exception condition associated with an AMQ protocol error code.
+ * </table>
+ *
+ * @todo This exception class is also used as a generic exception throughout Qpid code. This usage may not be strictly
+ * correct if this is to signify a protocol exception. Should review.
+ */
public class AMQException extends Exception
{
+ /** Holds the AMQ error code constant associated with this exception. */
private AMQConstant _errorCode;
+ /**
+ * Creates an exception with an optional error code, optional message and optional underlying cause.
+ *
+ * @param errorCode The error code. May be null if not to be set.
+ * @param msg The exception message. May be null if not to be set.
+ * @param t The underlying cause of the exception. May be null if not to be set.
+ */
+ public AMQException(AMQConstant errorCode, String msg, Throwable t)
+ {
+ super(((msg == null) ? "" : msg) + ((errorCode == null) ? "" : (" [error code " + errorCode + "]")), t);
+ _errorCode = errorCode;
+ }
+
+ /**
+ * @param message
+ *
+ * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead.
+ */
public AMQException(String message)
{
super(message);
@@ -34,6 +64,12 @@ public class AMQException extends Exception
_errorCode = AMQConstant.getConstant(-1);
}
+ /**
+ * @param msg
+ * @param t
+ *
+ * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead.
+ */
public AMQException(String msg, Throwable t)
{
super(msg, t);
@@ -41,21 +77,25 @@ public class AMQException extends Exception
_errorCode = AMQConstant.getConstant(-1);
}
- public AMQException(AMQConstant errorCode, String msg, Throwable t)
- {
- super(msg + " [error code " + errorCode + ']', t);
- _errorCode = errorCode;
- }
-
+ /**
+ * @param errorCode
+ * @param msg
+ *
+ * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead.
+ */
public AMQException(AMQConstant errorCode, String msg)
{
super(msg + " [error code " + errorCode + ']');
_errorCode = errorCode;
}
+ /**
+ * Gets the AMQ protocol exception code associated with this exception.
+ *
+ * @return The AMQ protocol exception code associated with this exception.
+ */
public AMQConstant getErrorCode()
{
return _errorCode;
}
-
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 43f888c029..9f36448986 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -94,7 +94,7 @@ public class AMQDataBlockDecoder
if(bodyFactory == null)
{
- throw new AMQFrameDecodingException("Unsupported frame type: " + type);
+ throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
}
final int channel = in.getUnsignedShort();
@@ -103,8 +103,8 @@ public class AMQDataBlockDecoder
// bodySize can be zero
if (channel < 0 || bodySize < 0)
{
- throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
- " bodySize = " + bodySize);
+ throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel +
+ " bodySize = " + bodySize, null);
}
AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
@@ -113,7 +113,7 @@ public class AMQDataBlockDecoder
byte marker = in.get();
if ((marker & 0xFF) != 0xCE)
{
- throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type);
+ throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type, null);
}
return frame;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
index a3d4513240..171da76771 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
@@ -21,16 +21,17 @@
package org.apache.qpid.framing;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQFrameDecodingException extends AMQException
{
- public AMQFrameDecodingException(String message)
+ /*public AMQFrameDecodingException(String message)
{
super(message);
- }
+ }*/
- public AMQFrameDecodingException(String message, Throwable t)
+ public AMQFrameDecodingException(AMQConstant errorCode, String message, Throwable t)
{
- super(message, t);
+ super(errorCode, message, t);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index 8b784fa3f7..008afb490e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -341,7 +341,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
}
catch (AMQFrameDecodingException e)
{
- throw new RuntimeException("Error in content header data: " + e);
+ throw new RuntimeException("Error in content header data: " + e, e);
}
final int endPos = buffer.position();
@@ -381,7 +381,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
}
catch (AMQFrameDecodingException e)
{
- throw new RuntimeException("Error in content header data: " + e);
+ throw new RuntimeException("Error in content header data: " + e, e);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
index 7dac018872..712eb437db 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
@@ -49,7 +49,7 @@ public class ContentHeaderPropertiesFactory
}
else
{
- throw new AMQFrameDecodingException("Unsupport content header class id: " + classId);
+ throw new AMQFrameDecodingException(null, "Unsupport content header class id: " + classId, null);
}
properties.populatePropertiesFromBuffer(buffer, propertyFlags, size);
return properties;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
index fcd8d47d32..916b476185 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
@@ -152,31 +152,31 @@ public class VersionSpecificRegistry
}
catch (NullPointerException e)
{
- throw new AMQFrameDecodingException("Class " + classID + " unknown in AMQP version " + _protocolMajorVersion
- + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + ".");
+ throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion
+ + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + ".", e);
}
catch (IndexOutOfBoundsException e)
{
if (classID >= _registry.length)
{
- throw new AMQFrameDecodingException("Class " + classID + " unknown in AMQP version " + _protocolMajorVersion
+ throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion
+ "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID
- + ".");
+ + ".", e);
}
else
{
- throw new AMQFrameDecodingException("Method " + methodID + " unknown in AMQP version "
+ throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version "
+ _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID
- + " method " + methodID + ".");
+ + " method " + methodID + ".", e);
}
}
if (bodyFactory == null)
{
- throw new AMQFrameDecodingException("Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion
- + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + ".");
+ throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion
+ + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + ".", null);
}
return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
diff --git a/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
index 04381d66a0..14db74438f 100644
--- a/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
+++ b/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
@@ -49,7 +49,7 @@ public abstract class AbstractConfig
}
catch(NumberFormatException e)
{
- throw new RuntimeException(msg + ": " + i);
+ throw new RuntimeException(msg + ": " + i, e);
}
}
@@ -61,7 +61,7 @@ public abstract class AbstractConfig
}
catch(NumberFormatException e)
{
- throw new RuntimeException(msg + ": " + i);
+ throw new RuntimeException(msg + ": " + i, e);
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
index 44285efd96..a0248a8f79 100644
--- a/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
+++ b/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
@@ -22,6 +22,7 @@ package org.apache.qpid.config;
import org.apache.qpid.config.ConnectionFactoryInitialiser;
import org.apache.qpid.config.ConnectorConfig;
+import org.apache.qpid.client.JMSAMQException;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@@ -63,11 +64,11 @@ public class JBossConnectionFactoryInitialiser implements ConnectionFactoryIniti
}
catch (NamingException e)
{
- throw new JMSException("Unable to lookup object: " + e);
+ throw new JMSAMQException("Unable to lookup object: " + e, e);
}
catch (Exception e)
{
- throw new JMSException("Error creating topic: " + e);
+ throw new JMSAMQException("Error creating topic: " + e, e);
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
index 342b28ca17..d5c0979399 100644
--- a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
@@ -221,7 +221,7 @@ public class Config extends AbstractConfig implements ConnectorConfig
}
catch(NumberFormatException e)
{
- throw new RuntimeException("Bad port number: " + value);
+ throw new RuntimeException("Bad port number: " + value, e);
}
}
else if("-payload".equalsIgnoreCase(key))
diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
index 53c4e7bb8f..80773c102d 100644
--- a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -3,6 +3,7 @@ package org.apache.qpid.testutil;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.log4j.Logger;
@@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener
}
catch (URLSyntaxException e)
{
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
}
}
}