diff options
author | Robert Greig <rgreig@apache.org> | 2007-04-09 15:26:04 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-04-09 15:26:04 +0000 |
commit | f966c066fc10c1510c244c41958e343c682dd1a1 (patch) | |
tree | ff0d2818fe3729bb6ebdfcb8d654d17b8599538f | |
parent | b8b2e032a4a6a6ad796ce6247c581a0498b5c264 (diff) | |
download | qpid-python-f966c066fc10c1510c244c41958e343c682dd1a1.tar.gz |
Stopped throwing away exception causes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@526776 13f79535-47bb-0310-9956-ffa450edef68
37 files changed, 745 insertions, 524 deletions
diff --git a/gentools/templ.java/MethodRegistryClass.tmpl b/gentools/templ.java/MethodRegistryClass.tmpl index 80bc61d4ce..388a1a936f 100644 --- a/gentools/templ.java/MethodRegistryClass.tmpl +++ b/gentools/templ.java/MethodRegistryClass.tmpl @@ -60,9 +60,9 @@ public class MainRegistry if (bodyFactory == null) { - throw new AMQFrameDecodingException( + throw new AMQFrameDecodingException(null, "Unable to find a suitable decoder for class " + classID + " and method " + - methodID + " in AMQP version " + major + "-" + minor + "."); + methodID + " in AMQP version " + major + "-" + minor + ".", null); } return bodyFactory.newInstance(major, minor, in, size); diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index bcbc8f52f6..db9f3a7421 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -1,5 +1,25 @@ /* * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/* + * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,8 +42,12 @@ import javax.management.MBeanException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import org.apache.commons.configuration.Configuration; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -36,9 +60,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.commons.configuration.Configuration; /** * This MBean implements the broker management interface and exposes the @@ -82,8 +103,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr * @param autoDelete * @throws JMException */ - public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete) - throws JMException + public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete) throws JMException { try { @@ -92,7 +112,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName)); if (exchange == null) { - exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0); + exchange = + _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, + autoDelete, 0); _exchangeRegistry.registerExchange(exchange); } else @@ -155,22 +177,27 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr { ownerShortString = new AMQShortString(owner); } + queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost()); if (queue.isDurable() && !queue.isAutoDelete()) { _messageStore.createQueue(queue); } - Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue); + Configuration virtualHostDefaultQueueConfiguration = + VirtualHostConfiguration.getDefaultQueueConfiguration(queue); if (virtualHostDefaultQueueConfiguration != null) { Configurator.configure(queue, virtualHostDefaultQueueConfiguration); } + _queueRegistry.registerQueue(queue); } catch (AMQException ex) { - throw new MBeanException(new JMException(ex.getMessage()),"Error in creating queue " + queueName); + JMException jme = new JMException(ex.getMessage()); + jme.initCause(ex); + throw new MBeanException(jme, "Error in creating queue " + queueName); } } @@ -201,7 +228,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr } catch (AMQException ex) { - throw new MBeanException(new JMException(ex.getMessage()), "Error in deleting queue " + queueName); + JMException jme = new JMException(ex.getMessage()); + jme.initCause(ex); + throw new MBeanException(jme, "Error in deleting queue " + queueName); } } @@ -212,7 +241,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr // This will have a single instance for a virtual host, so not having the name property in the ObjectName public ObjectName getObjectName() throws MalformedObjectNameException - { + { return getObjectNameForSingleInstanceMBean(); } } // End of MBean class diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 1d26abb63f..fd010e8923 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -36,14 +36,17 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.commons.configuration.ConfigurationException; + import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; @@ -59,7 +62,7 @@ import org.apache.qpid.url.URLSyntaxException; * Main entry point for AMQPD. * */ -@SuppressWarnings({"AccessStaticViaInstance"}) +@SuppressWarnings({ "AccessStaticViaInstance" }) public class Main { private static final Logger _logger = Logger.getLogger(Main.class); @@ -70,9 +73,9 @@ public class Main protected static class InitException extends Exception { - InitException(String msg) + InitException(String msg, Throwable cause) { - super(msg); + super(msg, cause); } } @@ -93,6 +96,7 @@ public class Main try { commandLine = new PosixParser().parse(options, args); + return true; } catch (ParseException e) @@ -100,6 +104,7 @@ public class Main System.err.println("Error: " + e.getMessage()); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("Qpid", options, true); + return false; } } @@ -108,17 +113,26 @@ public class Main { Option help = new Option("h", "help", false, "print this message"); Option version = new Option("v", "version", false, "print the version information and exit"); - Option configFile = OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file"). - withLongOpt("config").create("c"); - Option port = OptionBuilder.withArgName("port").hasArg().withDescription("listen on the specified port. Overrides any value in the config file"). - withLongOpt("port").create("p"); - Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file"). - withLongOpt("bind").create("b"); - Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " + - "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file"). - withLongOpt("logconfig").create("l"); - Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " + - "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); + Option configFile = + OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config") + .create("c"); + Option port = + OptionBuilder.withArgName("port").hasArg() + .withDescription("listen on the specified port. Overrides any value in the config file") + .withLongOpt("port").create("p"); + Option bind = + OptionBuilder.withArgName("bind").hasArg() + .withDescription("bind to the specified address. Overrides any value in the config file") + .withLongOpt("bind").create("b"); + Option logconfig = + OptionBuilder.withArgName("logconfig").hasArg() + .withDescription("use the specified log4j xml configuration file. By " + + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + + " in the same directory as the configuration file").withLongOpt("logconfig").create("l"); + Option logwatchconfig = + OptionBuilder.withArgName("logwatch").hasArg() + .withDescription("monitor the log file configuration file for changes. Units are seconds. " + + "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); options.addOption(help); options.addOption(version); @@ -146,7 +160,7 @@ public class Main boolean first = true; for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions()) { - if(first) + if (first) { first = false; } @@ -154,9 +168,11 @@ public class Main { protocol.append(", "); } + protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion()); } + System.out.println(ver + " (" + protocol + ")"); } else @@ -182,7 +198,6 @@ public class Main } } - protected void startup() throws InitException, ConfigurationException, Exception { final String QpidHome = System.getProperty("QPID_HOME"); @@ -197,7 +212,7 @@ public class Main error = error + "\nNote: Qpid_HOME is not set."; } - throw new InitException(error); + throw new InitException(error, null); } else { @@ -222,8 +237,8 @@ public class Main _logger.info("Starting Qpid.AMQP broker"); - ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). - getConfiguredObject(ConnectorConfiguration.class); + ConnectorConfiguration connectorConfig = + ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class); ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers); @@ -245,7 +260,7 @@ public class Main } catch (NumberFormatException e) { - throw new InitException("Invalid port: " + portStr); + throw new InitException("Invalid port: " + portStr, e); } } @@ -260,19 +275,21 @@ public class Main int totalVHosts = ((Collection) virtualHosts).size(); for (int vhost = 0; vhost < totalVHosts; vhost++) { - setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost)); + setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost)); } } else { - setupVirtualHosts(configFile.getParent() , (String)virtualHosts); + setupVirtualHosts(configFile.getParent(), (String) virtualHosts); } } + bind(port, connectorConfig); } - protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException + protected void setupVirtualHosts(String configFileParent, String configFilePath) + throws ConfigurationException, AMQException, URLSyntaxException { String configVar = "${conf}"; @@ -281,7 +298,7 @@ public class Main configFilePath = configFileParent + configFilePath.substring(configVar.length()); } - if (configFilePath.indexOf(".xml") != -1 ) + if (configFilePath.indexOf(".xml") != -1) { VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath); vHostConfig.performBindings(); @@ -294,11 +311,12 @@ public class Main String[] fileNames = virtualHostDir.list(); - for (int each=0; each < fileNames.length; each++) + for (int each = 0; each < fileNames.length; each++) { if (fileNames[each].endsWith(".xml")) { - VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]); + VirtualHostConfiguration vHostConfig = + new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]); vHostConfig.performBindings(); } } @@ -315,7 +333,7 @@ public class Main try { - //IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors); + // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors); IoAcceptor acceptor = connectorConfig.createAcceptor(); SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); @@ -330,7 +348,7 @@ public class Main { sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); } - + if (!connectorConfig.enableSSL || !connectorConfig.sslOnly) { AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); @@ -343,6 +361,7 @@ public class Main { bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port); } + acceptor.bind(bindAddress, handler, sconfig); _logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); } @@ -352,8 +371,7 @@ public class Main AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); try { - acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), - handler, sconfig); + acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); } catch (IOException e) @@ -391,10 +409,12 @@ public class Main throw new Exception("Error parsing IP address: " + address, e); } } + if (index != 4) { throw new Exception("Invalid IP address: " + address); } + return ip; } @@ -407,16 +427,17 @@ public class Main } catch (NumberFormatException e) { - System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " + - "a non-negative integer. Using default of zero (no watching configured"); + System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " + + "a non-negative integer. Using default of zero (no watching configured"); } + if (logConfigFile.exists() && logConfigFile.canRead()) { System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath()); if (logWatchTime > 0) { - System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " + - logWatchTime + " seconds"); + System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " + + logWatchTime + " seconds"); // log4j expects the watch interval in milliseconds DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java index cf0253b4be..2ede1e49d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,36 +20,37 @@ */ package org.apache.qpid.server.management; +import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.Map; -import java.util.HashMap; +import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; -import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; -import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.MBeanServerFactory; -import javax.management.remote.JMXServiceURL; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; import javax.management.remote.MBeanServerForwarder; +import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.Callback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AccountNotFoundException; import javax.security.sasl.AuthorizeCallback; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; /** * This class starts up an MBeanserver. If out of the box agent is being used then there are no security features @@ -76,13 +77,15 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry boolean security = appRegistry.getConfiguration().getBoolean("management.security-enabled", true); int port = appRegistry.getConfiguration().getInt("management.jmxport", 8999); - _mbeanServer = platformServer ? ManagementFactory.getPlatformMBeanServer() - : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN); + _mbeanServer = + platformServer ? ManagementFactory.getPlatformMBeanServer() + : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN); // Check if the "QPID_OPTS" is set to use Out of the Box JMXAgent if (areOutOfTheBoxJMXOptionsSet()) { _log.info("JMX: Using the out of the box JMX Agent"); + return; } @@ -95,18 +98,18 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry Map env = new HashMap(); env.put("jmx.remote.profiles", "SASL/PLAIN"); - //env.put("jmx.remote.profiles", "SASL/CRAM-MD5"); + // env.put("jmx.remote.profiles", "SASL/CRAM-MD5"); Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases(); Map.Entry<String, PrincipalDatabase> entry = map.entrySet().iterator().next(); - + // Callback handler used by the PLAIN SASL server mechanism to perform user authentication /* PlainInitialiser plainInitialiser = new PlainInitialiser(); plainInitialiser.initialise(entry.getValue()); env.put("jmx.remote.sasl.callback.handler", plainInitialiser.getCallbackHandler()); - */ - + */ + env.put("jmx.remote.sasl.callback.handler", new UserCallbackHandler(entry.getValue())); // Enable the SSL security and server authentication @@ -115,7 +118,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry SslRMIServerSocketFactory ssf = new SslRMIServerSocketFactory(); env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, csf); env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, ssf); - */ + */ try { @@ -162,7 +165,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry public void registerObject(ManagedObject managedObject) throws JMException { - _mbeanServer.registerMBean(managedObject, managedObject.getObjectName()); + _mbeanServer.registerMBean(managedObject, managedObject.getObjectName()); } public void unregisterObject(ManagedObject managedObject) throws JMException @@ -180,9 +183,10 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry { return true; } + if (System.getProperty("com.sun.management.jmxremote.port") != null) { - return true; + return true; } return false; @@ -248,21 +252,24 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry boolean authorized = false; // Process retrieval of password; can get password if username is available in NameCallback - if (ncb != null && pcb != null) + if ((ncb != null) && (pcb != null)) { String username = ncb.getDefaultName(); try { - authorized =_principalDatabase.verifyPassword(new UsernamePrincipal(username), pcb.getPassword()); + authorized = _principalDatabase.verifyPassword(new UsernamePrincipal(username), pcb.getPassword()); } catch (AccountNotFoundException e) { - throw new IOException("User not authorized. " + e); + IOException ioe = new IOException("User not authorized. " + e); + ioe.initCause(e); + throw ioe; } } + if (!authorized) { - throw new IOException("User not authorized."); + throw new IOException("User not authorized."); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 7a32848c44..e7e7f5c22f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -347,7 +347,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que } catch (AMQException e) { - throw new JMException("Error creating header attributes list: " + e); + JMException jme = new JMException("Error creating header attributes list: " + e); + jme.initCause(e); + throw jme; } } @@ -381,7 +383,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que } catch (AMQException e) { - throw new JMException("Error creating message contents: " + e); + JMException jme = new JMException("Error creating message contents: " + e); + jme.initCause(e); + throw jme; } return _messageList; diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 8806ac0516..89f0b7b39d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -153,7 +153,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _logger.error("Error configuring application: " + e, e); //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); - throw new RuntimeException("Unable to create Application Registry"); + throw new RuntimeException("Unable to create Application Registry", e); } } else @@ -193,7 +193,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry catch (Exception e) { _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); - throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); + throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); } Configurator.configure(instance); _configuredObjects.put(instanceType, instance); diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java index 1ccb13cf62..35d036d20f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java @@ -116,7 +116,9 @@ public class AccessManagerImpl implements AccessManager } catch (Exception e) { - throw new ConfigurationException(e.getMessage(), e.getCause()); + ConfigurationException ce = new ConfigurationException(e.getMessage(), e.getCause()); + ce.initCause(e); + throw ce; } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java index 2e019e1cfb..8d2bf6d5ab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java @@ -1,38 +1,40 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.server.security.auth.database; +import java.io.FileNotFoundException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; -import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -import org.apache.qpid.configuration.PropertyUtils; + import org.apache.log4j.Logger; -import java.util.Map; -import java.util.List; -import java.util.HashMap; -import java.lang.reflect.Method; -import java.io.FileNotFoundException; +import org.apache.qpid.configuration.PropertyUtils; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatabaseManager { @@ -80,23 +82,26 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab initialisePrincipalDatabase((PrincipalDatabase) o, config, i); String name = databaseNames.get(i); - if (name == null || name.length() == 0) + if ((name == null) || (name.length() == 0)) { throw new Exception("Principal database names must have length greater than or equal to one character"); } + PrincipalDatabase pd = databases.get(name); if (pd != null) { throw new Exception("Duplicate principal database name not provided"); } + _logger.info("Initialised principal database '" + name + "' successfully"); databases.put(name, (PrincipalDatabase) o); } + return databases; } private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, Configuration config, int index) - throws FileNotFoundException, ConfigurationException + throws FileNotFoundException, ConfigurationException { String baseName = _base + "(" + index + ").attributes.attribute."; List<String> argumentNames = config.getList(baseName + "name"); @@ -104,14 +109,16 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab for (int i = 0; i < argumentNames.size(); i++) { String argName = argumentNames.get(i); - if (argName == null || argName.length() == 0) + if ((argName == null) || (argName.length() == 0)) { throw new ConfigurationException("Argument names must have length >= 1 character"); } + if (Character.isLowerCase(argName.charAt(0))) { argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1); } + String methodName = "set" + argName; Method method = null; try @@ -125,9 +132,10 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab if (method == null) { - throw new ConfigurationException("No method " + methodName + " found in class " + principalDatabase.getClass() + - " hence unable to configure principal database. The method must be public and " + - "have a single String argument with a void return type"); + throw new ConfigurationException("No method " + methodName + " found in class " + + principalDatabase.getClass() + + " hence unable to configure principal database. The method must be public and " + + "have a single String argument with a void return type"); } try @@ -138,11 +146,11 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab { if (ite instanceof ConfigurationException) { - throw(ConfigurationException) ite; + throw (ConfigurationException) ite; } else { - throw new ConfigurationException(ite.getMessage(), ite.getCause()); + throw new ConfigurationException(ite.getMessage(), ite); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java index 68095de3a0..dd0bd096c3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,14 +33,16 @@ import javax.security.auth.login.AccountNotFoundException; import javax.security.sasl.AuthorizeCallback; import org.apache.commons.configuration.Configuration; + import org.apache.log4j.Logger; + import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; public abstract class UsernamePasswordInitialiser implements AuthenticationProviderInitialiser { - protected static final Logger _logger = Logger.getLogger(UsernamePasswordInitialiser.class); + protected static final Logger _logger = Logger.getLogger(UsernamePasswordInitialiser.class); private ServerCallbackHandler _callbackHandler; @@ -72,7 +74,9 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi { // very annoyingly the callback handler does not throw anything more appropriate than // IOException - throw new IOException("Error looking up user " + e); + IOException ioe = new IOException("Error looking up user " + e); + ioe.initCause(e); + throw ioe; } } else if (callback instanceof AuthorizeCallback) @@ -88,7 +92,7 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi } public void initialise(String baseConfigPath, Configuration configuration, - Map<String, PrincipalDatabase> principalDatabases) throws Exception + Map<String, PrincipalDatabase> principalDatabases) throws Exception { String principalDatabaseName = configuration.getString(baseConfigPath + ".principal-database"); PrincipalDatabase db = principalDatabases.get(principalDatabaseName); @@ -102,6 +106,7 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi { throw new NullPointerException("Cannot initialise with a null Principal database."); } + _callbackHandler = new ServerCallbackHandler(db); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index c24d1aa23a..b5c59dbbb7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -181,7 +181,7 @@ public class VirtualHost implements Accessable catch (Exception e)
{
_logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
}
Configurator.configure(instance);
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java index 88bcbbbccb..f3b21e3c64 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -47,7 +47,9 @@ public class FileMessageFactory } catch (IOException e) { - throw new MessageFactoryException(e.toString()); + MessageFactoryException mfe = new MessageFactoryException(e.toString()); + mfe.initCause(e); + throw mfe; } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java index 8505d1d457..98a2c0d497 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java @@ -59,11 +59,11 @@ public class InitialContextHelper } catch (IOException e) { - throw new ContextException(e.toString()); + throw new ContextException(e.toString(), e); } catch (NamingException n) { - throw new ContextException(n.toString()); + throw new ContextException(n.toString(), n); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 2274f2964f..0e3d99eeba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,29 @@ */ package org.apache.qpid.client; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import org.apache.log4j.Logger; + import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; @@ -44,28 +67,6 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -95,7 +96,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap + private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap private String _clientName; @@ -125,15 +126,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /* * _Connected should be refactored with a suitable wait object. - */ + */ private boolean _connected; /* * The last error code that occured on the connection. Used to return the correct exception to the client - */ + */ private AMQException _lastAMQException = null; - /* * The connection meta data */ @@ -161,13 +161,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @throws AMQException * @throws URLSyntaxException */ - public AMQConnection(String broker, String username, String password, - String clientName, String virtualHost) throws AMQException, URLSyntaxException + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + "/" + - virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null); + this(new AMQConnectionURL( + ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + + AMQBrokerDetails.checkTransport(broker) + "'"), null); } /** @@ -180,44 +180,38 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @throws AMQException * @throws URLSyntaxException */ - public AMQConnection(String broker, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost, + SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + "/" + - virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); + this(new AMQConnectionURL( + ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } - - public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost) throws AMQException, URLSyntaxException + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, null); } - public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, + SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, sslConfig); } - - public AMQConnection(String host, int port, boolean useSSL, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, + String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(useSSL ? - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='true'" : - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='false'" - ), sslConfig); + this(new AMQConnectionURL( + useSSL + ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'") + : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig); } public AMQConnection(String connection) throws AMQException, URLSyntaxException @@ -230,13 +224,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL(connection), sslConfig); } - public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); } + _sslConfiguration = sslConfig; if (connectionURL == null) { @@ -250,7 +244,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _password = connectionURL.getPassword(); setVirtualHost(connectionURL.getVirtualHost()); - if (connectionURL.getDefaultQueueExchangeName() != null) { _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); @@ -271,7 +264,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); } - _failoverPolicy = new FailoverPolicy(connectionURL); _protocolHandler = new AMQProtocolHandler(this); @@ -279,7 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // We are not currently connected _connected = false; - Exception lastException = new Exception(); lastException.initCause(new ConnectException()); @@ -297,7 +288,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (_logger.isInfoEnabled()) { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause()); + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), + e.getCause()); } } } @@ -323,7 +315,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - if (message == null || message.equals("")) + if ((message == null) || message.equals("")) { message = "Unable to Connect"; } @@ -336,11 +328,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString()); } + e.initCause(lastException); } throw e; } + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -370,6 +364,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { virtualHost = virtualHost.substring(1); } + _virtualHost = virtualHost; } @@ -383,7 +378,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.attainState(AMQState.CONNECTION_OPEN); _failoverPolicy.attainedConnection(); - //Again this should be changed to a suitable notify + // Again this should be changed to a suitable notify _connected = true; } catch (AMQException e) @@ -402,6 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(bd); + return true; } catch (Exception e) @@ -410,8 +406,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Unable to connect to broker at " + bd); } + attemptReconnection(); } + return false; } @@ -422,6 +420,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); + return true; } catch (Exception e) @@ -437,13 +436,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (_logger.isInfoEnabled()) { - _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); + _logger.info(e.getMessage() + ":Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails()); } } } } - //connection unsuccessful + // connection unsuccessful return false; } @@ -475,14 +475,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); } - public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetch) throws JMSException + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) + throws JMSException { return createSession(transacted, acknowledgeMode, prefetch, prefetch); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetchHigh, final int prefetchLow) throws JMSException + final int prefetchHigh, final int prefetchLow) throws JMSException { checkNotClosed(); if (channelLimitReached()) @@ -492,85 +492,81 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect else { return (org.apache.qpid.jms.Session) new FailoverSupport() - { - public Object operation() throws JMSException - { - int channelId = _idFactory.incrementAndGet(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Write channel open frame for channel id " + channelId); - } - - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, - prefetchHigh, prefetchLow); - _protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); - - boolean success = false; - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) - { - _protocolHandler.removeSessionByChannel(channelId); - deregisterSession(channelId); - } - } - - if (_started) { - try - { - session.start(); - } - catch (AMQException e) + public Object operation() throws JMSException { - throw new JMSAMQException(e); + int channelId = _idFactory.incrementAndGet(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Write channel open frame for channel id " + channelId); + } + + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); + + boolean success = false; + try + { + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) + { + _protocolHandler.removeSessionByChannel(channelId); + deregisterSession(channelId); + } + } + + if (_started) + { + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + return session; } - } - return session; - } - }.execute(this); + }.execute(this); } } private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException + throws AMQException { // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); - - //todo send low water mark when protocol allows. - //todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - false, // global - prefetchHigh, // prefetchCount - 0), // prefetchSize - BasicQosOkBody.class); + _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), null), // outOfBand + ChannelOpenOkBody.class); + + // todo send low water mark when protocol allows. + // todo Be aware of possible changes to parameter order as versions change. + _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), false, // global + prefetchHigh, // prefetchCount + 0), // prefetchSize + BasicQosOkBody.class); if (transacted) { @@ -580,10 +576,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), - TxSelectOkBody.class); + _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class); } } @@ -597,11 +591,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _protocolHandler.removeSessionByChannel(channelId); deregisterSession(channelId); - throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); + throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e); } } - public void setFailoverPolicy(FailoverPolicy policy) { _failoverPolicy = policy; @@ -646,12 +639,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private boolean channelLimitReached() { - return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount; + return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount); } public String getClientID() throws JMSException { checkNotClosed(); + return _clientName; } @@ -667,6 +661,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); + return _connectionMetaData; } @@ -674,6 +669,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); + return _exceptionListener; } @@ -707,6 +703,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new JMSAMQException(e); } } + _started = true; } } @@ -727,6 +724,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new JMSAMQException(e); } } + _started = false; } } @@ -753,7 +751,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - //adjust timeout + // adjust timeout long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); @@ -764,7 +762,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - //adjust timeout + // adjust timeout timeout = adjustTimeout(timeout, startCloseTime); _protocolHandler.closeConnection(timeout); @@ -772,7 +770,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (AMQException e) { - throw new JMSException("Error closing connection: " + e); + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + throw jmse; } } } @@ -786,6 +786,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { timeout = 0; } + return timeout; } @@ -804,6 +805,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect session.markClosed(); } + _sessions.clear(); } @@ -843,6 +845,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } } + _sessions.clear(); if (sessionException != null) { @@ -851,42 +854,42 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException + ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); + return null; } - public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, + int maxMessages) throws JMSException { checkNotClosed(); + return null; } - public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, + int maxMessages) throws JMSException { checkNotClosed(); + return null; } - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, ServerSessionPool sessionPool, - int maxMessages) - throws JMSException + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws JMSException { // TODO Auto-generated method stub checkNotClosed(); + return null; } public long getMaximumChannelCount() throws JMSException { checkNotClosed(); + return _maximumChannelCount; } @@ -975,6 +978,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { proceed = _connectionListener.preFailover(redirect); } + return proceed; } @@ -995,6 +999,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { markAllSessionsClosed(); } + return resubscribe; } else @@ -1058,12 +1063,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (cause instanceof AMQException) { - je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), "Exception thrown against " + toString() + ": " + cause); + je = + new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), + "Exception thrown against " + toString() + ": " + cause); } else { je = new JMSException("Exception thrown against " + toString() + ": " + cause); } + if (cause instanceof Exception) { je.setLinkedException((Exception) cause); @@ -1091,6 +1099,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Closing AMQConnection due to :" + cause.getMessage()); } + _closed.set(true); closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } @@ -1146,9 +1155,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect buf.append("Host: ").append(String.valueOf(bd.getHost())); buf.append("\nPort: ").append(String.valueOf(bd.getPort())); } + buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost)); buf.append("\nClient ID: ").append(String.valueOf(_clientName)); - buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size()); + buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size()); + return buf.toString(); } @@ -1159,11 +1170,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public Reference getReference() throws NamingException { - return new Reference( - AMQConnection.class.getName(), - new StringRefAddr(AMQConnection.class.getName(), toURL()), - AMQConnectionFactory.class.getName(), - null); // factory location + return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()), + AMQConnectionFactory.class.getName(), null); // factory location } public SSLConfiguration getSSLConfiguration() @@ -1176,19 +1184,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _defaultTopicExchangeName; } - public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName) { _defaultTopicExchangeName = defaultTopicExchangeName; } - public AMQShortString getDefaultQueueExchangeName() { return _defaultQueueExchangeName; } - public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName) { _defaultQueueExchangeName = defaultQueueExchangeName; @@ -1201,10 +1206,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQShortString getTemporaryQueueExchangeName() { - return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates. + return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates. } - public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) { _temporaryTopicExchangeName = temporaryTopicExchangeName; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 73010ce517..38c1cd8205 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.client; +import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -34,6 +34,7 @@ import javax.jms.Message; import javax.jms.MessageListener; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; @@ -138,10 +139,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private boolean _noConsume; private List<StackTraceElement> _closedStack = null; - protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, - boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, - int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -160,7 +161,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _autoClose = autoClose; _noConsume = noConsume; - //Force queue browsers not to use acknowledge modes. + // Force queue browsers not to use acknowledge modes. if (_noConsume) { _acknowledgeMode = Session.NO_ACKNOWLEDGE; @@ -175,12 +176,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public String getMessageSelector() throws JMSException { checkPreConditions(); + return _messageSelector; } public MessageListener getMessageListener() throws JMSException { checkPreConditions(); + return _messageListener.get(); } @@ -198,14 +201,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { checkPreConditions(); - //if the current listener is non-null and the session is not stopped, then - //it is an error to call this method. + // if the current listener is non-null and the session is not stopped, then + // it is an error to call this method. - //i.e. it is only valid to call this method if + // i.e. it is only valid to call this method if // - // (a) the connection is stopped, in which case the dispatcher is not running - // OR - // (b) the listener is null AND we are not receiving synchronously at present + // (a) the connection is stopped, in which case the dispatcher is not running + // OR + // (b) the listener is null AND we are not receiving synchronously at present // if (!_session.getAMQConnection().started()) @@ -215,7 +218,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { - _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination); + _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + + _destination); } } else @@ -224,6 +228,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); } + if (!_messageListener.compareAndSet(null, messageListener)) { throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started."); @@ -233,7 +238,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (messageListener != null) { - //handle case where connection has already been started, and the dispatcher has alreaded started + // handle case where connection has already been started, and the dispatcher has alreaded started // putting values on the _synchronousQueue synchronized (_session) @@ -263,10 +268,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("Another thread is already receiving."); } + if (isMessageListenerSet()) { throw new javax.jms.IllegalStateException("A listener has already been set."); } + _receivingThread = Thread.currentThread(); } @@ -331,6 +338,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { return null; } + Object o = null; if (l > 0) { @@ -340,6 +348,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { o = _synchronousQueue.take(); } + final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { @@ -352,6 +361,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer catch (InterruptedException e) { _logger.warn("Interrupted: " + e); + return null; } finally @@ -365,6 +375,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) { close(false); + return true; } else @@ -387,6 +398,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { return null; } + Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -414,8 +426,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a * JMSException is created with the linked exception set appropriately */ - private AbstractJMSMessage returnMessageOrThrow(Object o) - throws JMSException + private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException { // errors are passed via the queue too since there is no way of interrupting the poll() via the API. if (o instanceof Throwable) @@ -425,6 +436,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { e.setLinkedException((Exception) o); } + throw e; } else @@ -433,7 +445,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void close() throws JMSException { close(true); @@ -441,7 +452,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { - //synchronized (_closed) + // synchronized (_closed) if (_logger.isInfoEnabled()) { @@ -456,7 +467,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (_closedStack != null) { - _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + _logger.trace(_consumerTag + " close():" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -464,14 +476,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6); } } + if (sendClose) { // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - _consumerTag, // consumerTag - false); // nowait + final AMQFrame cancelFrame = + BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag + false); // nowait try { @@ -485,25 +497,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (AMQException e) { - _logger.error("Error closing consumer: " + e, e); - throw new JMSException("Error closing consumer: " + e); + // _logger.error("Error closing consumer: " + e, e); + JMSException jmse = new JMSException("Error closing consumer: " + e); + jmse.setLinkedException(e); + throw jmse; } } else { -// //fixme this probably is not right -// if (!isNoConsume()) - { //done in BasicCancelOK Handler but not sending one so just deregister. + // //fixme this probably is not right + // if (!isNoConsume()) + { // done in BasicCancelOK Handler but not sending one so just deregister. deregisterConsumer(); } } - if (_messageListener != null && _receiving.get()) + if ((_messageListener != null) && _receiving.get()) { if (_logger.isInfoEnabled()) { _logger.info("Interrupting thread: " + _receivingThread); } + _receivingThread.interrupt(); } } @@ -516,7 +531,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ void markClosed() { -// synchronized (_closed) + // synchronized (_closed) { _closed.set(true); @@ -524,7 +539,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (_closedStack != null) { - _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " markClosed():" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -533,6 +549,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } } + deregisterConsumer(); } @@ -551,22 +568,22 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag); } + try { - AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, - messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, - messageFrame.getContentHeader(), - messageFrame.getBodies()); + AbstractJMSMessage jmsMessage = + _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, + messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, + messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); if (debug) { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } -// synchronized (_closed) + // synchronized (_closed) + { -// if (!_closed.get()) + // if (!_closed.get()) { jmsMessage.setConsumer(this); @@ -575,12 +592,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer notifyMessage(jmsMessage, channelId); } -// else -// { -// _logger.error("MESSAGE REJECTING!"); -// _session.rejectMessage(jmsMessage, true); -// //_logger.error("MESSAGE JUST DROPPED!"); -// } + // else + // { + // _logger.error("MESSAGE REJECTING!"); + // _session.rejectMessage(jmsMessage, true); + // //_logger.error("MESSAGE JUST DROPPED!"); + // } } } catch (Exception e) @@ -606,11 +623,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (isMessageListenerSet()) { - //we do not need a lock around the test above, and the dispatch below as it is invalid - //for an application to alter an installed listener while the session is started -// synchronized (_closed) + // we do not need a lock around the test above, and the dispatch below as it is invalid + // for an application to alter an installed listener while the session is started + // synchronized (_closed) { -// if (!_closed.get()) + // if (!_closed.get()) { preApplicationProcessing(jmsMessage); @@ -641,14 +658,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); - break; + + case Session.PRE_ACKNOWLEDGE: + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + break; + + case Session.CLIENT_ACKNOWLEDGE: + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + break; } } @@ -657,47 +676,56 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer msg.setJMSDestination(_destination); switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - break; - case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } - if (_outstanding <= _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } - if (_dups_ok_acknowledge_send) - { - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); - } - } - break; - case Session.AUTO_ACKNOWLEDGE: - // we do not auto ack a message if the application code called recover() + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + + break; + + case Session.DUPS_OK_ACKNOWLEDGE: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } + + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } + + if (_dups_ok_acknowledge_send) + { if (!_session.isInRecovery()) { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); + _session.acknowledgeMessage(msg.getDeliveryTag(), true); } - break; - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _receivedDeliveryTags.add(msg.getDeliveryTag()); - } - break; + } + + break; + + case Session.AUTO_ACKNOWLEDGE: + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + + break; + + case Session.SESSION_TRANSACTED: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _receivedDeliveryTags.add(msg.getDeliveryTag()); + } + + break; } } @@ -721,14 +749,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer void notifyError(Throwable cause) { -// synchronized (_closed) + // synchronized (_closed) { _closed.set(true); if (_logger.isTraceEnabled()) { if (_closedStack != null) { - _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " notifyError():" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously" + _closedStack.toString()); } else @@ -737,7 +766,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } } - //QPID-293 can "request redelivery of this error through dispatcher" + // QPID-293 can "request redelivery of this error through dispatcher" // we have no way of propagating the exception to a message listener - a JMS limitation - so we // deal with the case where we have a synchronous receive() waiting for a message to arrive @@ -749,10 +778,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _logger.debug("Passed exception to synchronous queue for propagation to receive()"); } } + deregisterConsumer(); } - /** * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in * the case of an error occurring. @@ -782,7 +811,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer this.checkNotClosed(); - if (_session == null || _session.isClosed()) + if ((_session == null) || _session.isClosed()) { throw new javax.jms.IllegalStateException("Invalid Session"); } @@ -817,7 +846,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _autoClose; } - public boolean isNoConsume() { return _noConsume; @@ -827,10 +855,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closeWhenNoMessages = b; - if (_closeWhenNoMessages - && _synchronousQueue.isEmpty() - && _receiving.get() - && _messageListener != null) + if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null)) { _receivingThread.interrupt(); } @@ -846,13 +871,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _logger.debug("Rejecting received messages in _receivedDTs (RQ)"); } - //rollback received but not committed messages + // rollback received but not committed messages while (!_receivedDeliveryTags.isEmpty()) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" + - "for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" + + "for consumer with tag:" + _consumerTag); } Long tag = _receivedDeliveryTags.poll(); @@ -876,14 +901,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - //rollback pending messages + // rollback pending messages if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" + - "for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" + + "for consumer with tag:" + _consumerTag); } + Iterator iterator = _synchronousQueue.iterator(); while (iterator.hasNext()) @@ -898,13 +924,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } + iterator.remove(); } else { - _logger.error("Queue contained a :" + o.getClass() + - " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + _logger.error("Queue contained a :" + o.getClass() + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); iterator.remove(); } } @@ -919,7 +946,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public String debugIdentity() { return String.valueOf(_consumerTag); diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java index d2ab6bd2c2..d1237cff49 100644 --- a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java +++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java @@ -1,5 +1,25 @@ /* * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/* + * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,10 +42,35 @@ import javax.jms.JMSException; import org.apache.qpid.AMQException; /** + * JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old + * Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions + * as well as error messages, through its constructor, but is a JMSException. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Accept wrapped exceptions as a JMSException. + * </table> + * * @author Apache Software Foundation */ public class JMSAMQException extends JMSException { + /** + * Creates a JMSException, wrapping another exception class. + * + * @param message The error message. + * @param cause The underlying exception that caused this one. May be null if none is to be set. + */ + public JMSAMQException(String message, Exception cause) + { + super(message); + + if (cause != null) + { + setLinkedException(cause); + } + } + public JMSAMQException(AMQException s) { super(s.getMessage(), String.valueOf(s.getErrorCode())); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 8938130417..af254fbbaf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -27,6 +27,7 @@ import javax.jms.JMSException; import javax.jms.MessageEOFException; import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -72,7 +73,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage } AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + AMQShortString routingKey, ByteBuffer data) throws AMQException { // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); @@ -93,7 +94,9 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage } catch (IOException e) { - throw new JMSException(e.toString()); + JMSException jmse = new JMSException(e.toString()); + jmse.setLinkedException(e); + throw jmse; } } @@ -112,6 +115,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage { return null; } + int pos = _data.position(); _data.rewind(); // one byte left is for the end of frame marker @@ -119,12 +123,14 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage { // this is really redundant since pos must be zero _data.position(pos); + return null; } else { String data = _data.getString(Charset.forName("UTF8").newDecoder()); _data.position(pos); + return data; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 41a143c544..f87b4027f6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -33,12 +33,7 @@ import javax.jms.MessageNotWriteableException; import org.apache.commons.collections.map.ReferenceMap; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQUndefinedDestination; -import org.apache.qpid.client.BasicMessageConsumer; -import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.*; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; @@ -184,7 +179,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } catch (URLSyntaxException e) { - throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding); + throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); } _destinationCache.put(replyToEncoding, dest); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 6352f7029f..348a0bd152 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -384,7 +384,9 @@ public final class JMSHeaderAdapter }
catch (AMQPInvalidClassException aice)
{
- throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ mfe.setLinkedException(aice);
+ throw mfe;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index df1400b167..caf8741280 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -33,6 +33,7 @@ import javax.jms.MessageFormatException; import javax.jms.ObjectMessage; import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -61,14 +62,15 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); _data.setAutoExpand(true); } + getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); } /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, + ByteBuffer data) throws AMQException { super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); } @@ -79,6 +81,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { _data.release(); } + _data = null; } @@ -116,11 +119,13 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } catch (IOException e) { - throw new MessageFormatException("Message not serializable: " + e); + MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e); + mfe.setLinkedException(e); + throw mfe; } } - + public Serializable getObject() throws JMSException { ObjectInputStream in = null; @@ -133,17 +138,20 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { _data.rewind(); in = new ObjectInputStream(_data.asInputStream()); + return (Serializable) in.readObject(); } catch (IOException e) { - e.printStackTrace(); - throw new MessageFormatException("Could not deserialize message: " + e); + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; } catch (ClassNotFoundException e) { - e.printStackTrace(); - throw new MessageFormatException("Could not deserialize message: " + e); + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; } finally { @@ -162,8 +170,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } } catch (IOException ignore) - { - } + { } } private static String toString(ByteBuffer data) @@ -172,6 +179,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { return null; } + int pos = data.position(); try { diff --git a/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java b/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java index 4504498308..dcca55e6c2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java @@ -31,15 +31,16 @@ import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.RealmCallback; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.log4j.Logger; import com.sun.crypto.provider.HmacMD5; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.protocol.AMQProtocolSession; + public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler { private static final Logger _logger = Logger.getLogger(UsernameHashedPasswordCallbackHandler.class); - private AMQProtocolSession _protocolSession; public void initialise(AMQProtocolSession protocolSession) @@ -58,14 +59,15 @@ public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler } else if (cb instanceof PasswordCallback) { - try { ((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword())); } - catch (Exception e) + catch (NoSuchAlgorithmException e) { - throw new UnsupportedCallbackException(cb); + UnsupportedCallbackException uce = new UnsupportedCallbackException(cb); + uce.initCause(e); + throw uce; } } else diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java index 104c5bfc44..1ec3adc2eb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,7 +33,7 @@ public class AMQNoTransportForProtocolException extends AMQTransportConnectionEx public AMQNoTransportForProtocolException(BrokerDetails details, String message) { - super(message); + super(null, message, null); _details = details; } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java index 4b17661bc3..fec7ff693c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,12 +21,12 @@ package org.apache.qpid.client.transport; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; public class AMQTransportConnectionException extends AMQException { - public AMQTransportConnectionException(String message) + public AMQTransportConnectionException(AMQConstant errorCode, String message, Throwable cause) { - super(message); - + super(errorCode, message, cause); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 8368eee125..0bc83e9804 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,12 +26,14 @@ import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; + import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; + import org.apache.qpid.client.AMQBrokerDetails; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; @@ -64,13 +66,11 @@ public class TransportConnection int transport = getTransport(details.getTransport()); if (transport == -1) - { throw new AMQNoTransportForProtocolException(details); } if (transport == _currentInstance) - { if (transport == VM) { @@ -88,40 +88,42 @@ public class TransportConnection _currentInstance = transport; switch (transport) - { - case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() + + case TCP: + _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() { - SocketConnector result; - //FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) + public IoConnector newSocketConnector() { - _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set."); -// result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector + SocketConnector result; + // FIXME - this needs to be sorted to use the new Mina MultiThread SA. + if (Boolean.getBoolean("qpidnio")) + { + _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set."); + // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector + } + // else + + { + _logger.info("Using Mina NIO"); + result = new SocketConnector(); // non-blocking connector + } + + // Don't have the connector's worker thread wait around for other connections (we only use + // one SocketConnector per connection at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + result.setWorkerTimeout(0); + + return result; } -// else - { - _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector - } - - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); + }); + break; - return result; - } - }); - break; - case VM: - { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - break; - } + case VM: + { + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + break; + } } return _instance; @@ -142,7 +144,8 @@ public class TransportConnection return -1; } - private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException + private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) + throws AMQVMBrokerCreationException { int port = details.getPort(); @@ -154,14 +157,14 @@ public class TransportConnection } else { - throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); + throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port + + " does not exist. Auto create disabled.", null); } } return new VmPipeTransportConnection(port); } - public static void createVMBroker(int port) throws AMQVMBrokerCreationException { if (_acceptor == null) @@ -192,7 +195,7 @@ public class TransportConnection { _logger.error(e); - //Try and unbind provider + // Try and unbind provider try { VmPipeAddress pipe = new VmPipeAddress(port); @@ -203,7 +206,7 @@ public class TransportConnection } catch (Exception ignore) { - //ignore + // ignore } if (provider == null) @@ -227,7 +230,7 @@ public class TransportConnection because = e.getCause().toString(); } - throw new AMQVMBrokerCreationException(port, because + " Stopped binding of InVM Qpid.AMQP"); + throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); } } } @@ -246,14 +249,14 @@ public class TransportConnection // can't use introspection to get Provider as it is a server class. // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access. - //get right constructor and pass in instancec ID - "port" + // get right constructor and pass in instancec ID - "port" IoHandlerAdapter provider; try { - Class[] cnstr = {Integer.class}; - Object[] params = {port}; + Class[] cnstr = { Integer.class }; + Object[] params = { port }; provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); - //Give the broker a second to create + // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); } catch (Exception e) @@ -270,8 +273,10 @@ public class TransportConnection because = e.getCause().toString(); } - - throw new AMQVMBrokerCreationException(port, because + " Stopped InVM Qpid.AMQP creation"); + AMQVMBrokerCreationException amqbce = + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); + amqbce.initCause(e); + throw amqbce; } return provider; diff --git a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java index 607ddcc26a..4b2982fe9c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,19 +21,25 @@ package org.apache.qpid.client.vmbroker; import org.apache.qpid.client.transport.AMQTransportConnectionException; +import org.apache.qpid.protocol.AMQConstant; public class AMQVMBrokerCreationException extends AMQTransportConnectionException { private int _port; + /** + * @param port + * + * @deprecated + */ public AMQVMBrokerCreationException(int port) { - this(port, "Unable to create vm broker"); + this(null, port, "Unable to create vm broker", null); } - public AMQVMBrokerCreationException(int port, String message) + public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause) { - super(message); + super(errorCode, message, cause); _port = port; } diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index 9adf04e182..6ad3fb4bae 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -101,7 +101,7 @@ public class FailoverPolicy } catch (Exception cnfe) { - throw new IllegalArgumentException("Unknown failover method:" + failoverMethod); + throw new IllegalArgumentException("Unknown failover method:" + failoverMethod, cnfe); } } } diff --git a/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/java/client/src/test/java/org/apache/qpid/testutil/Config.java index 8109d20a33..b777cf93b6 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/Config.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/Config.java @@ -172,7 +172,7 @@ public class Config } catch(NumberFormatException e) { - throw new RuntimeException("Bad port number: " + value); + throw new RuntimeException("Bad port number: " + value, e); } } else if("-name".equalsIgnoreCase(key)) diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java index f2afa472ab..195ed79dab 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -3,6 +3,7 @@ package org.apache.qpid.testutil; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; import org.apache.log4j.Logger; @@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener } catch (URLSyntaxException e) { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); } } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java index 32c1e76a39..2f04a01f53 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -22,11 +22,41 @@ package org.apache.qpid; import org.apache.qpid.protocol.AMQConstant; -/** Generic AMQ exception. */ +/** + * AMQException forms the root exception of all exceptions relating to the AMQ protocol. It provides space to associate + * an AMQ error code with the exception, which is a numberic value, with a meaning defined by the protocol. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents an exception condition associated with an AMQ protocol error code. + * </table> + * + * @todo This exception class is also used as a generic exception throughout Qpid code. This usage may not be strictly + * correct if this is to signify a protocol exception. Should review. + */ public class AMQException extends Exception { + /** Holds the AMQ error code constant associated with this exception. */ private AMQConstant _errorCode; + /** + * Creates an exception with an optional error code, optional message and optional underlying cause. + * + * @param errorCode The error code. May be null if not to be set. + * @param msg The exception message. May be null if not to be set. + * @param t The underlying cause of the exception. May be null if not to be set. + */ + public AMQException(AMQConstant errorCode, String msg, Throwable t) + { + super(((msg == null) ? "" : msg) + ((errorCode == null) ? "" : (" [error code " + errorCode + "]")), t); + _errorCode = errorCode; + } + + /** + * @param message + * + * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead. + */ public AMQException(String message) { super(message); @@ -34,6 +64,12 @@ public class AMQException extends Exception _errorCode = AMQConstant.getConstant(-1); } + /** + * @param msg + * @param t + * + * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead. + */ public AMQException(String msg, Throwable t) { super(msg, t); @@ -41,21 +77,25 @@ public class AMQException extends Exception _errorCode = AMQConstant.getConstant(-1); } - public AMQException(AMQConstant errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - + /** + * @param errorCode + * @param msg + * + * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead. + */ public AMQException(AMQConstant errorCode, String msg) { super(msg + " [error code " + errorCode + ']'); _errorCode = errorCode; } + /** + * Gets the AMQ protocol exception code associated with this exception. + * + * @return The AMQ protocol exception code associated with this exception. + */ public AMQConstant getErrorCode() { return _errorCode; } - } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 43f888c029..9f36448986 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -94,7 +94,7 @@ public class AMQDataBlockDecoder if(bodyFactory == null) { - throw new AMQFrameDecodingException("Unsupported frame type: " + type); + throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); } final int channel = in.getUnsignedShort(); @@ -103,8 +103,8 @@ public class AMQDataBlockDecoder // bodySize can be zero if (channel < 0 || bodySize < 0) { - throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + - " bodySize = " + bodySize); + throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize, null); } AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); @@ -113,7 +113,7 @@ public class AMQDataBlockDecoder byte marker = in.get(); if ((marker & 0xFF) != 0xCE) { - throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type); + throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type, null); } return frame; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java index a3d4513240..171da76771 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java @@ -21,16 +21,17 @@ package org.apache.qpid.framing; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; public class AMQFrameDecodingException extends AMQException { - public AMQFrameDecodingException(String message) + /*public AMQFrameDecodingException(String message) { super(message); - } + }*/ - public AMQFrameDecodingException(String message, Throwable t) + public AMQFrameDecodingException(AMQConstant errorCode, String message, Throwable t) { - super(message, t); + super(errorCode, message, t); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 8b784fa3f7..008afb490e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -341,7 +341,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti } catch (AMQFrameDecodingException e) { - throw new RuntimeException("Error in content header data: " + e); + throw new RuntimeException("Error in content header data: " + e, e); } final int endPos = buffer.position(); @@ -381,7 +381,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti } catch (AMQFrameDecodingException e) { - throw new RuntimeException("Error in content header data: " + e); + throw new RuntimeException("Error in content header data: " + e, e); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 7dac018872..712eb437db 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -49,7 +49,7 @@ public class ContentHeaderPropertiesFactory } else { - throw new AMQFrameDecodingException("Unsupport content header class id: " + classId); + throw new AMQFrameDecodingException(null, "Unsupport content header class id: " + classId, null); } properties.populatePropertiesFromBuffer(buffer, propertyFlags, size); return properties; diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index fcd8d47d32..916b476185 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -152,31 +152,31 @@ public class VersionSpecificRegistry }
catch (NullPointerException e)
{
- throw new AMQFrameDecodingException("Class " + classID + " unknown in AMQP version " + _protocolMajorVersion
- + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + ".");
+ throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion
+ + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + ".", e);
}
catch (IndexOutOfBoundsException e)
{
if (classID >= _registry.length)
{
- throw new AMQFrameDecodingException("Class " + classID + " unknown in AMQP version " + _protocolMajorVersion
+ throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion
+ "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID
- + ".");
+ + ".", e);
}
else
{
- throw new AMQFrameDecodingException("Method " + methodID + " unknown in AMQP version "
+ throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version "
+ _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID
- + " method " + methodID + ".");
+ + " method " + methodID + ".", e);
}
}
if (bodyFactory == null)
{
- throw new AMQFrameDecodingException("Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion
- + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + ".");
+ throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion
+ + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + ".", null);
}
return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
diff --git a/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java index 04381d66a0..14db74438f 100644 --- a/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java +++ b/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java @@ -49,7 +49,7 @@ public abstract class AbstractConfig } catch(NumberFormatException e) { - throw new RuntimeException(msg + ": " + i); + throw new RuntimeException(msg + ": " + i, e); } } @@ -61,7 +61,7 @@ public abstract class AbstractConfig } catch(NumberFormatException e) { - throw new RuntimeException(msg + ": " + i); + throw new RuntimeException(msg + ": " + i, e); } } diff --git a/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java index 44285efd96..a0248a8f79 100644 --- a/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java +++ b/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java @@ -22,6 +22,7 @@ package org.apache.qpid.config; import org.apache.qpid.config.ConnectionFactoryInitialiser; import org.apache.qpid.config.ConnectorConfig; +import org.apache.qpid.client.JMSAMQException; import javax.jms.ConnectionFactory; import javax.jms.JMSException; @@ -63,11 +64,11 @@ public class JBossConnectionFactoryInitialiser implements ConnectionFactoryIniti } catch (NamingException e) { - throw new JMSException("Unable to lookup object: " + e); + throw new JMSAMQException("Unable to lookup object: " + e, e); } catch (Exception e) { - throw new JMSException("Error creating topic: " + e); + throw new JMSAMQException("Error creating topic: " + e, e); } } diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java index 342b28ca17..d5c0979399 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java @@ -221,7 +221,7 @@ public class Config extends AbstractConfig implements ConnectorConfig } catch(NumberFormatException e) { - throw new RuntimeException("Bad port number: " + value); + throw new RuntimeException("Bad port number: " + value, e); } } else if("-payload".equalsIgnoreCase(key)) diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java index 53c4e7bb8f..80773c102d 100644 --- a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -3,6 +3,7 @@ package org.apache.qpid.testutil; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; import org.apache.log4j.Logger; @@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener } catch (URLSyntaxException e) { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); } } } |