diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-23 16:11:43 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-23 16:11:43 +0000 |
commit | a7fe503beac48244dbbc06878821c603ca09acd3 (patch) | |
tree | 6d76c926f5cb286ec352cb93fea17b61bc73070d | |
parent | 2aff2a16161c1912aa355dd88696ebf5f48317f4 (diff) | |
download | qpid-python-a7fe503beac48244dbbc06878821c603ca09acd3.tar.gz |
Get all protocols running on the same port
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829116 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 640 insertions, 101 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 326352b644..0a2bd6e8b0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -26,6 +26,11 @@ import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Properties; +import java.util.Set; +import java.util.HashSet; +import java.util.List; +import java.util.Collection; +import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; @@ -37,8 +42,6 @@ import org.apache.commons.cli.PosixParser; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.xml.QpidLog4JConfigurator; -import org.apache.qpid.AMQException; -import org.apache.qpid.transport.network.io.IoTransport; import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.*; import org.apache.qpid.common.QpidProperties; @@ -52,15 +55,16 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.management.LoggingManagementMBean; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.mina.MINANetworkDriver; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; /** * Main entry point for AMQPD. @@ -79,6 +83,7 @@ public class Main private static final int IPV4_ADDRESS_LENGTH = 4; private static final char IPV4_LITERAL_SEPARATOR = '.'; + private static final Collection<VERSION> ALL_VERSIONS = Arrays.asList(VERSION.values()); protected static class InitException extends Exception { @@ -129,6 +134,24 @@ public class Main OptionBuilder.withArgName("port").hasArg() .withDescription("listen on the specified port. Overrides any value in the config file") .withLongOpt("port").create("p"); + + Option exclude0_10 = + OptionBuilder.withArgName("exclude-0-10").hasArg() + .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line") + .withLongOpt("exclude-0-10").create(); + + Option exclude0_9 = + OptionBuilder.withArgName("exclude-0-9").hasArg() + .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line") + .withLongOpt("exclude-0-9").create(); + + + Option exclude0_8 = + OptionBuilder.withArgName("exclude-0-8").hasArg() + .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line") + .withLongOpt("exclude-0-8").create(); + + Option mport = OptionBuilder.withArgName("mport").hasArg() .withDescription("listen on the specified management port. Overrides any value in the config file") @@ -155,6 +178,9 @@ public class Main options.addOption(logconfig); options.addOption(logwatchconfig); options.addOption(port); + options.addOption(exclude0_10); + options.addOption(exclude0_9); + options.addOption(exclude0_8); options.addOption(mport); options.addOption(bind); } @@ -303,23 +329,35 @@ public class Main _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); - int port = serverConfig.getPort(); - String portStr = commandLine.getOptionValue("p"); - if (portStr != null) + + String[] portStr = commandLine.getOptionValues("p"); + + Set<Integer> ports = new HashSet<Integer>(); + Set<Integer> exclude_0_10 = new HashSet<Integer>(); + Set<Integer> exclude_0_9 = new HashSet<Integer>(); + Set<Integer> exclude_0_8 = new HashSet<Integer>(); + + if(portStr == null || portStr.length == 0) { - try - { - port = Integer.parseInt(portStr); - } - catch (NumberFormatException e) - { - throw new InitException("Invalid port: " + portStr, e); - } + + parsePortList(ports, serverConfig.getPorts()); + parsePortList(exclude_0_10, serverConfig.getPortExclude010()); + parsePortList(exclude_0_9, serverConfig.getPortExclude09()); + parsePortList(exclude_0_8, serverConfig.getPortExclude08()); + } + else + { + parsePortArray(ports, portStr); + parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10")); + parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9")); + parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8")); + + } + + - //TODO - HACK - port += 10; String bindAddr = commandLine.getOptionValue("b"); if (bindAddr == null) @@ -327,15 +365,21 @@ public class Main bindAddr = serverConfig.getBind(); } InetAddress bindAddress = null; + + + if (bindAddr.equals("wildcard")) { - bindAddress = new InetSocketAddress(port).getAddress(); + bindAddress = new InetSocketAddress(0).getAddress(); } else { bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); } + String hostName = bindAddress.getCanonicalHostName(); + + String keystorePath = serverConfig.getKeystorePath(); String keystorePassword = serverConfig.getKeystorePassword(); String certType = serverConfig.getCertType(); @@ -343,12 +387,40 @@ public class Main if (!serverConfig.getSSLOnly()) { - NetworkDriver driver = new MINANetworkDriver(); - driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), - serverConfig.getNetworkConfiguration(), null); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(driver,"TCP")); - CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port)); + + for(int port : ports) + { + + NetworkDriver driver = new MINANetworkDriver(); + + Set<VERSION> supported = new HashSet<VERSION>(ALL_VERSIONS); + + if(exclude_0_10.contains(port)) + { + supported.remove(VERSION.v0_10); + } + if(exclude_0_9.contains(port)) + { + supported.remove(VERSION.v0_9); + } + if(exclude_0_8.contains(port)) + { + supported.remove(VERSION.v0_8); + } + + MultiVersionProtocolEngineFactory protocolEngineFactory = + new MultiVersionProtocolEngineFactory(hostName, supported); + + + + driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory, + serverConfig.getNetworkConfiguration(), null); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port)); + + } + } if (serverConfig.getEnableSSL()) @@ -357,7 +429,7 @@ public class Main NetworkDriver driver = new MINANetworkDriver(); driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()), new QpidAcceptor(driver,"TCP")); CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort())); } @@ -366,61 +438,55 @@ public class Main _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); + CurrentActor.get().message(BrokerMessages.BRK_1004()); - int port_0_10 = port - 10; - - IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - final ConnectionDelegate delegate = - new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, bindAddress.getCanonicalHostName()); - - - - -/* - NetworkDriver driver = new MINANetworkDriver(); - driver.bind(port_0_10, new InetAddress[]{bindAddress}, new ProtocolEngineFactory_0_10(delegate), - serverConfig.getNetworkConfiguration(), null); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(driver,"TCP")); - CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port)); -*/ - - - - - - // TODO - Fix to use a proper binding - - + } + finally + { + // Startup is complete so remove the AR initialised Startup actor + CurrentActor.remove(); + } + } - ConnectionBinding cb = new ConnectionBinding() + private void parsePortArray(Set<Integer> ports, String[] portStr) + throws InitException + { + if(portStr != null) + { + for(int i = 0; i < portStr.length; i++) { - public Connection connection() + try { - ServerConnection conn = new ServerConnection(); - conn.setConnectionDelegate(delegate); - return conn; + ports.add(Integer.parseInt(portStr[i])); } - }; - - org.apache.qpid.transport.network.io.IoAcceptor ioa = new org.apache.qpid.transport.network.io.IoAcceptor - ("0.0.0.0", port_0_10, cb); - ioa.start(); - - CurrentActor.get().message(BrokerMessages.BRK_1004()); - + catch (NumberFormatException e) + { + throw new InitException("Invalid port: " + portStr[i], e); + } + } } - finally + } + + private void parsePortList(Set<Integer> output, List input) + throws InitException + { + if(input != null) { - // Startup is complete so remove the AR initialised Startup actor - CurrentActor.remove(); + for(Object port : input) + { + try + { + output.add(Integer.parseInt(String.valueOf(port))); + } + catch (NumberFormatException e) + { + throw new InitException("Invalid port: " + port, e); + } + } } - - - } /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 566f5a2de1..7bf28c7560 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Locale; +import java.util.Collections; import java.util.Map.Entry; import org.apache.commons.configuration.CompositeConfiguration; @@ -524,11 +525,27 @@ public class ServerConfiguration implements SignalHandler return getConfig().getInt("connector.processors", 4); } - public int getPort() + public List getPorts() { - return getConfig().getInt("connector.port", DEFAULT_PORT); + return getConfig().getList("connector.port", Collections.singletonList(DEFAULT_PORT)); } + public List getPortExclude010() + { + return getConfig().getList("connector.non010port", Collections.EMPTY_LIST); + } + + public List getPortExclude09() + { + return getConfig().getList("connector.non09port", Collections.EMPTY_LIST); + } + + public List getPortExclude08() + { + return getConfig().getList("connector.non08port", Collections.EMPTY_LIST); + } + + public String getBind() { return getConfig().getString("connector.bind", "wildcard"); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java new file mode 100755 index 0000000000..78e21a8f14 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -0,0 +1,366 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; +import org.apache.qpid.server.transport.ServerConnection; +import org.apache.log4j.Logger; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.Set; + +public class MultiVersionProtocolEngine implements ProtocolEngine +{ + private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); + + + + private NetworkDriver _networkDriver; + private Set<VERSION> _supported; + private String _fqdn; + private IApplicationRegistry _appRegistry; + + private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine(); + + public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, + String fqdn, + Set<VERSION> supported, NetworkDriver networkDriver) + { + _appRegistry = appRegistry; + _fqdn = fqdn; + _supported = supported; + _networkDriver = networkDriver; + } + + public void setNetworkDriver(NetworkDriver driver) + { + _delegate.setNetworkDriver(driver); + } + + public SocketAddress getRemoteAddress() + { + return _delegate.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _delegate.getLocalAddress(); + } + + public long getWrittenBytes() + { + return _delegate.getWrittenBytes(); + } + + public long getReadBytes() + { + return _delegate.getReadBytes(); + } + + public void closed() + { + _delegate.closed(); + } + + public void writerIdle() + { + _delegate.writerIdle(); + } + + public void readerIdle() + { + _delegate.readerIdle(); + } + + public void received(ByteBuffer msg) + { + _delegate.received(msg); + } + + public void exception(Throwable t) + { + _delegate.exception(t); + } + + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; + + private static final byte[] AMQP_0_8_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 8, + (byte) 0 + }; + + private static final byte[] AMQP_0_9_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 0, + (byte) 9 + }; + +private static final byte[] AMQP_0_9_1_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 0, + (byte) 9, + (byte) 1 + }; + + + private static final byte[] AMQP_0_10_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 0, + (byte) 10 + }; + + private static interface DelegateCreator + { + VERSION getVersion(); + byte[] getHeaderIdentifier(); + ProtocolEngine getProtocolEngine(); + } + + private DelegateCreator creator_0_8 = new DelegateCreator() + { + + public VERSION getVersion() + { + return VERSION.v0_8; + } + + public byte[] getHeaderIdentifier() + { + return AMQP_0_8_HEADER; + } + + public ProtocolEngine getProtocolEngine() + { + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + } + }; + + private DelegateCreator creator_0_9 = new DelegateCreator() + { + + public VERSION getVersion() + { + return VERSION.v0_9; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_0_9_HEADER; + } + + public ProtocolEngine getProtocolEngine() + { + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + } + }; + + private DelegateCreator creator_0_9_1 = new DelegateCreator() + { + + public VERSION getVersion() + { + return VERSION.v0_9_1; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_0_9_1_HEADER; + } + + public ProtocolEngine getProtocolEngine() + { + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + } + }; + + + private DelegateCreator creator_0_10 = new DelegateCreator() + { + + public VERSION getVersion() + { + return VERSION.v0_10; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_0_10_HEADER; + } + + public ProtocolEngine getProtocolEngine() + { + final ConnectionDelegate connDelegate = + new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn); + + Connection conn = new ServerConnection(); + conn.setConnectionDelegate(connDelegate); + + return new ProtocolEngine_0_10( conn, _networkDriver); + } + }; + + private final DelegateCreator[] _creators = + new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; + + + private class SelfDelegateProtocolEngine implements ProtocolEngine + { + + private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); + + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getWrittenBytes() + { + return 0; + } + + public long getReadBytes() + { + return 0; + } + + public void received(ByteBuffer msg) + { + ByteBuffer msgheader = msg.duplicate(); + if(_header.remaining() > msgheader.limit()) + { + msg.position(msg.limit()); + } + else + { + msgheader.limit(_header.remaining()); + msg.position(_header.remaining()); + } + + _header.put(msgheader); + + if(!_header.hasRemaining()) + { + _header.flip(); + byte[] headerBytes = new byte[MINIMUM_REQUIRED_HEADER_BYTES]; + _header.get(headerBytes); + + + ProtocolEngine newDelegate = null; + + for(int i = 0; newDelegate == null && i < _creators.length; i++) + { + + if(_supported.contains(_creators[i].getVersion())) + { + byte[] compareBytes = _creators[i].getHeaderIdentifier(); + boolean equal = true; + for(int j = 0; equal && j<compareBytes.length; j++) + { + equal = headerBytes[j] == compareBytes[j]; + } + if(equal) + { + newDelegate = _creators[i].getProtocolEngine(); + } + + + } + } + // let the first delegate handle completely unknown versions + if(newDelegate == null) + { + newDelegate = _creators[0].getProtocolEngine(); + } + newDelegate.setNetworkDriver(_networkDriver); + + _delegate = newDelegate; + + _header.flip(); + _delegate.received(_header); + if(msg.hasRemaining()) + { + _delegate.received(msg); + } + + } + + } + + public void exception(Throwable t) + { + _logger.error("Error establishing session", t); + } + + public void closed() + { + + } + + public void writerIdle() + { + + } + + public void readerIdle() + { + + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java new file mode 100755 index 0000000000..75358c42d9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -0,0 +1,75 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; + +import java.util.Set; +import java.util.Arrays; +import java.util.HashSet; + +public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory +{ + ; + + + public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 }; + + private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values())); + + private final IApplicationRegistry _appRegistry; + private final String _fqdn; + private final Set<VERSION> _supported; + + + public MultiVersionProtocolEngineFactory() + { + this(1, "localhost", ALL_VERSIONS); + } + + public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions) + { + this(1, fqdn, versions); + } + + + public MultiVersionProtocolEngineFactory(String fqdn) + { + this(1, fqdn, ALL_VERSIONS); + } + + public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions) + { + _appRegistry = ApplicationRegistry.getInstance(instance); + _fqdn = fqdn; + _supported = supportedVersions; + } + + + public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + { + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a8ad52f6ee..356a5121e4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -393,7 +393,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (!isDeleted()) { subscription.setQueue(this, exclusive); - subscription.setNoLocal(_nolocal); + if(_nolocal) + { + subscription.setNoLocal(_nolocal); + } _subscriptionList.add(subscription); if (isDeleted()) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 6dce7aacb7..b5f499dee6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,6 +27,7 @@ import java.io.RandomAccessFile; import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Collections; import junit.framework.TestCase; @@ -421,7 +422,7 @@ public class ServerConfigurationTest extends TestCase { // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); - assertEquals(0, serverConfig.getMaximumMessageCount()); + assertEquals(0, serverConfig.getMaximumMessageCount()); // Check value we set _config.setProperty("maximumMessageCount", 10L); @@ -481,12 +482,17 @@ public class ServerConfigurationTest extends TestCase { // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); - assertEquals(5672, serverConfig.getPort()); + assertNotNull(serverConfig.getPorts()); + assertEquals(1, serverConfig.getPorts().size()); + assertEquals(5672, serverConfig.getPorts().get(0)); + // Check value we set - _config.setProperty("connector.port", 10); + _config.setProperty("connector.port", "10"); serverConfig = new ServerConfiguration(_config); - assertEquals(10, serverConfig.getPort()); + assertNotNull(serverConfig.getPorts()); + assertEquals(1, serverConfig.getPorts().size()); + assertEquals("10", serverConfig.getPorts().get(0)); } public void testGetBind() throws ConfigurationException @@ -722,7 +728,9 @@ public class ServerConfigurationTest extends TestCase ServerConfiguration config = new ServerConfiguration(mainFile.getAbsoluteFile()); assertEquals(4235, config.getSSLPort()); // From first file, not // overriden by second - assertEquals(2342, config.getPort()); // From the first file, not + assertNotNull(config.getPorts()); + assertEquals(1, config.getPorts().size()); + assertEquals("2342", config.getPorts().get(0)); // From the first file, not // present in the second assertEquals(true, config.getQpidNIO()); // From the second file, not // present in the first @@ -794,7 +802,7 @@ public class ServerConfigurationTest extends TestCase TestNetworkDriver testDriver = new TestNetworkDriver(); testDriver.setRemoteAddress("127.0.0.1"); - + AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver); assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost)); @@ -966,7 +974,7 @@ public class ServerConfigurationTest extends TestCase out.write("\t<rule access=\"deny\" network=\"127.0.0.1\"/>"); out.write("</firewall>\n"); out.close(); - + reg.getConfiguration().reparseConfigFile(); assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost)); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 5659ce0474..2f5d07e396 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -214,14 +214,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) { done = (_fastAccessSessions[id] == null); - } + } else { done = (!_slowAccessSessions.keySet().contains(id)); } } } - + return id; } @@ -320,11 +320,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Indicates whether we need to sync on every message ack private boolean _syncAck; - + //Indicates the sync publish options (persistent|all) //By default it's async publish - private String _syncPublish = ""; - + private String _syncPublish = ""; + /** * @param broker brokerdetails * @param username username @@ -418,7 +418,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) { - _syncPersistence = + _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); _logger.warn("sync_persistence is a deprecated property, " + "please use sync_publish={persistent|all} instead"); @@ -453,10 +453,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // use the default value set for all connections _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); } - + + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); + _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); - if (brokerDetails.getTransport().equals(BrokerDetails.VM)) + if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion) || "0-9".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); } @@ -538,7 +540,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else if (!_connected) { - retryAllowed = _failoverPolicy.failoverAllowed(); + retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); } } @@ -591,7 +593,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } - + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -1573,7 +1575,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncPersistence; } - + /** * Indicates whether we need to sync on every message ack */ @@ -1581,12 +1583,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncAck; } - + public String getSyncPublish() { return _syncPublish; } - + public void setIdleTimeout(long l) { _delegate.setIdleTimeout(l); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index 3627618e68..a016e90d79 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -33,7 +33,7 @@ public class ClientProperties public static final String IGNORE_SET_CLIENTID_PROP_NAME = "ignore_setclientID"; /** - * This property is currently used within the 0.10 code path only + * This property is currently used within the 0.10 code path only * The maximum number of pre-fetched messages per destination * This property is used for all the connection unless it is overwritten by the connectionURL * type: long @@ -46,13 +46,13 @@ public class ClientProperties * type: boolean */ public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence"; - + /** * When true a sync command is sent after sending a message ack. * type: boolean */ public static final String SYNC_ACK_PROP_NAME = "sync_ack"; - + /** * sync_publish property - {persistent|all} * If set to 'persistent',then persistent messages will be publish synchronously @@ -60,17 +60,17 @@ public class ClientProperties * published synchronously. */ public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish"; - + /** * This value will be used in the following settings * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout) * If this values is between the max and min values specified for heartbeat * by the broker in TuneOK it will be used as the heartbeat interval. - * If not a warning will be printed and the max value specified for + * If not a warning will be printed and the max value specified for * heartbeat in TuneOK will be used */ public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout"; - + /** * ========================================================== @@ -100,4 +100,6 @@ public class ClientProperties */ public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144"; + + public static final String AMQP_VERSION = "qpid.amqp.version"; } diff --git a/qpid/java/test-profiles/java-derby.testprofile b/qpid/java/test-profiles/java-derby.testprofile index e7c1dc1fd6..fb339c6823 100644 --- a/qpid/java/test-profiles/java-derby.testprofile +++ b/qpid/java/test-profiles/java-derby.testprofile @@ -1,5 +1,5 @@ broker.language=java -broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml +broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/derbyDB broker.ready=BRK-1004 broker.stopped=Exception diff --git a/qpid/java/test-profiles/java.testprofile b/qpid/java/test-profiles/java.testprofile index 05a801ee06..a9e067e143 100644 --- a/qpid/java/test-profiles/java.testprofile +++ b/qpid/java/test-profiles/java.testprofile @@ -1,5 +1,5 @@ broker.language=java -broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml +broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/derbyDB broker.ready=BRK-1004 broker.stopped=Exception |