summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-23 16:11:43 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-23 16:11:43 +0000
commita7fe503beac48244dbbc06878821c603ca09acd3 (patch)
tree6d76c926f5cb286ec352cb93fea17b61bc73070d
parent2aff2a16161c1912aa355dd88696ebf5f48317f4 (diff)
downloadqpid-python-a7fe503beac48244dbbc06878821c603ca09acd3.tar.gz
Get all protocols running on the same port
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829116 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java202
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java21
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java366
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java75
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java28
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java14
-rw-r--r--qpid/java/test-profiles/java-derby.testprofile2
-rw-r--r--qpid/java/test-profiles/java.testprofile2
10 files changed, 640 insertions, 101 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 326352b644..0a2bd6e8b0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -26,6 +26,11 @@ import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Properties;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Collection;
+import java.util.Arrays;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
@@ -37,8 +42,6 @@ import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.*;
import org.apache.qpid.common.QpidProperties;
@@ -52,15 +55,16 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.management.LoggingManagementMBean;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
/**
* Main entry point for AMQPD.
@@ -79,6 +83,7 @@ public class Main
private static final int IPV4_ADDRESS_LENGTH = 4;
private static final char IPV4_LITERAL_SEPARATOR = '.';
+ private static final Collection<VERSION> ALL_VERSIONS = Arrays.asList(VERSION.values());
protected static class InitException extends Exception
{
@@ -129,6 +134,24 @@ public class Main
OptionBuilder.withArgName("port").hasArg()
.withDescription("listen on the specified port. Overrides any value in the config file")
.withLongOpt("port").create("p");
+
+ Option exclude0_10 =
+ OptionBuilder.withArgName("exclude-0-10").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-10").create();
+
+ Option exclude0_9 =
+ OptionBuilder.withArgName("exclude-0-9").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-9").create();
+
+
+ Option exclude0_8 =
+ OptionBuilder.withArgName("exclude-0-8").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-8").create();
+
+
Option mport =
OptionBuilder.withArgName("mport").hasArg()
.withDescription("listen on the specified management port. Overrides any value in the config file")
@@ -155,6 +178,9 @@ public class Main
options.addOption(logconfig);
options.addOption(logwatchconfig);
options.addOption(port);
+ options.addOption(exclude0_10);
+ options.addOption(exclude0_9);
+ options.addOption(exclude0_8);
options.addOption(mport);
options.addOption(bind);
}
@@ -303,23 +329,35 @@ public class Main
_brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
- int port = serverConfig.getPort();
- String portStr = commandLine.getOptionValue("p");
- if (portStr != null)
+
+ String[] portStr = commandLine.getOptionValues("p");
+
+ Set<Integer> ports = new HashSet<Integer>();
+ Set<Integer> exclude_0_10 = new HashSet<Integer>();
+ Set<Integer> exclude_0_9 = new HashSet<Integer>();
+ Set<Integer> exclude_0_8 = new HashSet<Integer>();
+
+ if(portStr == null || portStr.length == 0)
{
- try
- {
- port = Integer.parseInt(portStr);
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid port: " + portStr, e);
- }
+
+ parsePortList(ports, serverConfig.getPorts());
+ parsePortList(exclude_0_10, serverConfig.getPortExclude010());
+ parsePortList(exclude_0_9, serverConfig.getPortExclude09());
+ parsePortList(exclude_0_8, serverConfig.getPortExclude08());
+
}
+ else
+ {
+ parsePortArray(ports, portStr);
+ parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10"));
+ parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9"));
+ parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8"));
+
+ }
+
+
- //TODO - HACK
- port += 10;
String bindAddr = commandLine.getOptionValue("b");
if (bindAddr == null)
@@ -327,15 +365,21 @@ public class Main
bindAddr = serverConfig.getBind();
}
InetAddress bindAddress = null;
+
+
+
if (bindAddr.equals("wildcard"))
{
- bindAddress = new InetSocketAddress(port).getAddress();
+ bindAddress = new InetSocketAddress(0).getAddress();
}
else
{
bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
}
+ String hostName = bindAddress.getCanonicalHostName();
+
+
String keystorePath = serverConfig.getKeystorePath();
String keystorePassword = serverConfig.getKeystorePassword();
String certType = serverConfig.getCertType();
@@ -343,12 +387,40 @@ public class Main
if (!serverConfig.getSSLOnly())
{
- NetworkDriver driver = new MINANetworkDriver();
- driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(),
- serverConfig.getNetworkConfiguration(), null);
- ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
- new QpidAcceptor(driver,"TCP"));
- CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
+
+ for(int port : ports)
+ {
+
+ NetworkDriver driver = new MINANetworkDriver();
+
+ Set<VERSION> supported = new HashSet<VERSION>(ALL_VERSIONS);
+
+ if(exclude_0_10.contains(port))
+ {
+ supported.remove(VERSION.v0_10);
+ }
+ if(exclude_0_9.contains(port))
+ {
+ supported.remove(VERSION.v0_9);
+ }
+ if(exclude_0_8.contains(port))
+ {
+ supported.remove(VERSION.v0_8);
+ }
+
+ MultiVersionProtocolEngineFactory protocolEngineFactory =
+ new MultiVersionProtocolEngineFactory(hostName, supported);
+
+
+
+ driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory,
+ serverConfig.getNetworkConfiguration(), null);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
+
+ }
+
}
if (serverConfig.getEnableSSL())
@@ -357,7 +429,7 @@ public class Main
NetworkDriver driver = new MINANetworkDriver();
driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
- ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()),
new QpidAcceptor(driver,"TCP"));
CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort()));
}
@@ -366,61 +438,55 @@ public class Main
_brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
+ CurrentActor.get().message(BrokerMessages.BRK_1004());
- int port_0_10 = port - 10;
-
- IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- final ConnectionDelegate delegate =
- new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, bindAddress.getCanonicalHostName());
-
-
-
-
-/*
- NetworkDriver driver = new MINANetworkDriver();
- driver.bind(port_0_10, new InetAddress[]{bindAddress}, new ProtocolEngineFactory_0_10(delegate),
- serverConfig.getNetworkConfiguration(), null);
- ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
- new QpidAcceptor(driver,"TCP"));
- CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
-*/
-
-
-
-
-
- // TODO - Fix to use a proper binding
-
-
+ }
+ finally
+ {
+ // Startup is complete so remove the AR initialised Startup actor
+ CurrentActor.remove();
+ }
+ }
- ConnectionBinding cb = new ConnectionBinding()
+ private void parsePortArray(Set<Integer> ports, String[] portStr)
+ throws InitException
+ {
+ if(portStr != null)
+ {
+ for(int i = 0; i < portStr.length; i++)
{
- public Connection connection()
+ try
{
- ServerConnection conn = new ServerConnection();
- conn.setConnectionDelegate(delegate);
- return conn;
+ ports.add(Integer.parseInt(portStr[i]));
}
- };
-
- org.apache.qpid.transport.network.io.IoAcceptor ioa = new org.apache.qpid.transport.network.io.IoAcceptor
- ("0.0.0.0", port_0_10, cb);
- ioa.start();
-
- CurrentActor.get().message(BrokerMessages.BRK_1004());
-
+ catch (NumberFormatException e)
+ {
+ throw new InitException("Invalid port: " + portStr[i], e);
+ }
+ }
}
- finally
+ }
+
+ private void parsePortList(Set<Integer> output, List input)
+ throws InitException
+ {
+ if(input != null)
{
- // Startup is complete so remove the AR initialised Startup actor
- CurrentActor.remove();
+ for(Object port : input)
+ {
+ try
+ {
+ output.add(Integer.parseInt(String.valueOf(port)));
+ }
+ catch (NumberFormatException e)
+ {
+ throw new InitException("Invalid port: " + port, e);
+ }
+ }
}
-
-
-
}
/**
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 566f5a2de1..7bf28c7560 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Locale;
+import java.util.Collections;
import java.util.Map.Entry;
import org.apache.commons.configuration.CompositeConfiguration;
@@ -524,11 +525,27 @@ public class ServerConfiguration implements SignalHandler
return getConfig().getInt("connector.processors", 4);
}
- public int getPort()
+ public List getPorts()
{
- return getConfig().getInt("connector.port", DEFAULT_PORT);
+ return getConfig().getList("connector.port", Collections.singletonList(DEFAULT_PORT));
}
+ public List getPortExclude010()
+ {
+ return getConfig().getList("connector.non010port", Collections.EMPTY_LIST);
+ }
+
+ public List getPortExclude09()
+ {
+ return getConfig().getList("connector.non09port", Collections.EMPTY_LIST);
+ }
+
+ public List getPortExclude08()
+ {
+ return getConfig().getList("connector.non08port", Collections.EMPTY_LIST);
+ }
+
+
public String getBind()
{
return getConfig().getString("connector.bind", "wildcard");
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
new file mode 100755
index 0000000000..78e21a8f14
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -0,0 +1,366 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.log4j.Logger;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+public class MultiVersionProtocolEngine implements ProtocolEngine
+{
+ private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
+
+
+
+ private NetworkDriver _networkDriver;
+ private Set<VERSION> _supported;
+ private String _fqdn;
+ private IApplicationRegistry _appRegistry;
+
+ private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+
+ public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
+ String fqdn,
+ Set<VERSION> supported, NetworkDriver networkDriver)
+ {
+ _appRegistry = appRegistry;
+ _fqdn = fqdn;
+ _supported = supported;
+ _networkDriver = networkDriver;
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _delegate.setNetworkDriver(driver);
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _delegate.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _delegate.getLocalAddress();
+ }
+
+ public long getWrittenBytes()
+ {
+ return _delegate.getWrittenBytes();
+ }
+
+ public long getReadBytes()
+ {
+ return _delegate.getReadBytes();
+ }
+
+ public void closed()
+ {
+ _delegate.closed();
+ }
+
+ public void writerIdle()
+ {
+ _delegate.writerIdle();
+ }
+
+ public void readerIdle()
+ {
+ _delegate.readerIdle();
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ _delegate.received(msg);
+ }
+
+ public void exception(Throwable t)
+ {
+ _delegate.exception(t);
+ }
+
+ private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
+
+ private static final byte[] AMQP_0_8_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 8,
+ (byte) 0
+ };
+
+ private static final byte[] AMQP_0_9_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 0,
+ (byte) 9
+ };
+
+private static final byte[] AMQP_0_9_1_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 0,
+ (byte) 9,
+ (byte) 1
+ };
+
+
+ private static final byte[] AMQP_0_10_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 0,
+ (byte) 10
+ };
+
+ private static interface DelegateCreator
+ {
+ VERSION getVersion();
+ byte[] getHeaderIdentifier();
+ ProtocolEngine getProtocolEngine();
+ }
+
+ private DelegateCreator creator_0_8 = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v0_8;
+ }
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_0_8_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ }
+ };
+
+ private DelegateCreator creator_0_9 = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v0_9;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_0_9_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ }
+ };
+
+ private DelegateCreator creator_0_9_1 = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v0_9_1;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_0_9_1_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ }
+ };
+
+
+ private DelegateCreator creator_0_10 = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v0_10;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_0_10_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ final ConnectionDelegate connDelegate =
+ new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
+
+ Connection conn = new ServerConnection();
+ conn.setConnectionDelegate(connDelegate);
+
+ return new ProtocolEngine_0_10( conn, _networkDriver);
+ }
+ };
+
+ private final DelegateCreator[] _creators =
+ new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
+
+
+ private class SelfDelegateProtocolEngine implements ProtocolEngine
+ {
+
+ private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public long getReadBytes()
+ {
+ return 0;
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ ByteBuffer msgheader = msg.duplicate();
+ if(_header.remaining() > msgheader.limit())
+ {
+ msg.position(msg.limit());
+ }
+ else
+ {
+ msgheader.limit(_header.remaining());
+ msg.position(_header.remaining());
+ }
+
+ _header.put(msgheader);
+
+ if(!_header.hasRemaining())
+ {
+ _header.flip();
+ byte[] headerBytes = new byte[MINIMUM_REQUIRED_HEADER_BYTES];
+ _header.get(headerBytes);
+
+
+ ProtocolEngine newDelegate = null;
+
+ for(int i = 0; newDelegate == null && i < _creators.length; i++)
+ {
+
+ if(_supported.contains(_creators[i].getVersion()))
+ {
+ byte[] compareBytes = _creators[i].getHeaderIdentifier();
+ boolean equal = true;
+ for(int j = 0; equal && j<compareBytes.length; j++)
+ {
+ equal = headerBytes[j] == compareBytes[j];
+ }
+ if(equal)
+ {
+ newDelegate = _creators[i].getProtocolEngine();
+ }
+
+
+ }
+ }
+ // let the first delegate handle completely unknown versions
+ if(newDelegate == null)
+ {
+ newDelegate = _creators[0].getProtocolEngine();
+ }
+ newDelegate.setNetworkDriver(_networkDriver);
+
+ _delegate = newDelegate;
+
+ _header.flip();
+ _delegate.received(_header);
+ if(msg.hasRemaining())
+ {
+ _delegate.received(msg);
+ }
+
+ }
+
+ }
+
+ public void exception(Throwable t)
+ {
+ _logger.error("Error establishing session", t);
+ }
+
+ public void closed()
+ {
+
+ }
+
+ public void writerIdle()
+ {
+
+ }
+
+ public void readerIdle()
+ {
+
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
new file mode 100755
index 0000000000..75358c42d9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -0,0 +1,75 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+
+import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
+{
+ ;
+
+
+ public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 };
+
+ private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
+
+ private final IApplicationRegistry _appRegistry;
+ private final String _fqdn;
+ private final Set<VERSION> _supported;
+
+
+ public MultiVersionProtocolEngineFactory()
+ {
+ this(1, "localhost", ALL_VERSIONS);
+ }
+
+ public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
+ {
+ this(1, fqdn, versions);
+ }
+
+
+ public MultiVersionProtocolEngineFactory(String fqdn)
+ {
+ this(1, fqdn, ALL_VERSIONS);
+ }
+
+ public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
+ {
+ _appRegistry = ApplicationRegistry.getInstance(instance);
+ _fqdn = fqdn;
+ _supported = supportedVersions;
+ }
+
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ {
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a8ad52f6ee..356a5121e4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -393,7 +393,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (!isDeleted())
{
subscription.setQueue(this, exclusive);
- subscription.setNoLocal(_nolocal);
+ if(_nolocal)
+ {
+ subscription.setNoLocal(_nolocal);
+ }
_subscriptionList.add(subscription);
if (isDeleted())
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 6dce7aacb7..b5f499dee6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,7 @@ import java.io.RandomAccessFile;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
+import java.util.Collections;
import junit.framework.TestCase;
@@ -421,7 +422,7 @@ public class ServerConfigurationTest extends TestCase
{
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
- assertEquals(0, serverConfig.getMaximumMessageCount());
+ assertEquals(0, serverConfig.getMaximumMessageCount());
// Check value we set
_config.setProperty("maximumMessageCount", 10L);
@@ -481,12 +482,17 @@ public class ServerConfigurationTest extends TestCase
{
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
- assertEquals(5672, serverConfig.getPort());
+ assertNotNull(serverConfig.getPorts());
+ assertEquals(1, serverConfig.getPorts().size());
+ assertEquals(5672, serverConfig.getPorts().get(0));
+
// Check value we set
- _config.setProperty("connector.port", 10);
+ _config.setProperty("connector.port", "10");
serverConfig = new ServerConfiguration(_config);
- assertEquals(10, serverConfig.getPort());
+ assertNotNull(serverConfig.getPorts());
+ assertEquals(1, serverConfig.getPorts().size());
+ assertEquals("10", serverConfig.getPorts().get(0));
}
public void testGetBind() throws ConfigurationException
@@ -722,7 +728,9 @@ public class ServerConfigurationTest extends TestCase
ServerConfiguration config = new ServerConfiguration(mainFile.getAbsoluteFile());
assertEquals(4235, config.getSSLPort()); // From first file, not
// overriden by second
- assertEquals(2342, config.getPort()); // From the first file, not
+ assertNotNull(config.getPorts());
+ assertEquals(1, config.getPorts().size());
+ assertEquals("2342", config.getPorts().get(0)); // From the first file, not
// present in the second
assertEquals(true, config.getQpidNIO()); // From the second file, not
// present in the first
@@ -794,7 +802,7 @@ public class ServerConfigurationTest extends TestCase
TestNetworkDriver testDriver = new TestNetworkDriver();
testDriver.setRemoteAddress("127.0.0.1");
-
+
AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
@@ -966,7 +974,7 @@ public class ServerConfigurationTest extends TestCase
out.write("\t<rule access=\"deny\" network=\"127.0.0.1\"/>");
out.write("</firewall>\n");
out.close();
-
+
reg.getConfiguration().reparseConfigFile();
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5659ce0474..2f5d07e396 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -214,14 +214,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
{
done = (_fastAccessSessions[id] == null);
- }
+ }
else
{
done = (!_slowAccessSessions.keySet().contains(id));
}
}
}
-
+
return id;
}
@@ -320,11 +320,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Indicates whether we need to sync on every message ack
private boolean _syncAck;
-
+
//Indicates the sync publish options (persistent|all)
//By default it's async publish
- private String _syncPublish = "";
-
+ private String _syncPublish = "";
+
/**
* @param broker brokerdetails
* @param username username
@@ -418,7 +418,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
{
- _syncPersistence =
+ _syncPersistence =
Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
_logger.warn("sync_persistence is a deprecated property, " +
"please use sync_publish={persistent|all} instead");
@@ -453,10 +453,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
-
+
+ String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
+
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
- if (brokerDetails.getTransport().equals(BrokerDetails.VM))
+ if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion) || "0-9".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
}
@@ -538,7 +540,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else if (!_connected)
{
- retryAllowed = _failoverPolicy.failoverAllowed();
+ retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
@@ -591,7 +593,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new AMQConnectionFailureException(message, connectionException);
}
-
+
_connectionMetaData = new QpidConnectionMetaData(this);
}
@@ -1573,7 +1575,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _syncPersistence;
}
-
+
/**
* Indicates whether we need to sync on every message ack
*/
@@ -1581,12 +1583,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _syncAck;
}
-
+
public String getSyncPublish()
{
return _syncPublish;
}
-
+
public void setIdleTimeout(long l)
{
_delegate.setIdleTimeout(l);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
index 3627618e68..a016e90d79 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -33,7 +33,7 @@ public class ClientProperties
public static final String IGNORE_SET_CLIENTID_PROP_NAME = "ignore_setclientID";
/**
- * This property is currently used within the 0.10 code path only
+ * This property is currently used within the 0.10 code path only
* The maximum number of pre-fetched messages per destination
* This property is used for all the connection unless it is overwritten by the connectionURL
* type: long
@@ -46,13 +46,13 @@ public class ClientProperties
* type: boolean
*/
public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
-
+
/**
* When true a sync command is sent after sending a message ack.
* type: boolean
*/
public static final String SYNC_ACK_PROP_NAME = "sync_ack";
-
+
/**
* sync_publish property - {persistent|all}
* If set to 'persistent',then persistent messages will be publish synchronously
@@ -60,17 +60,17 @@ public class ClientProperties
* published synchronously.
*/
public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish";
-
+
/**
* This value will be used in the following settings
* To calculate the SO_TIMEOUT option of the socket (2*idle_timeout)
* If this values is between the max and min values specified for heartbeat
* by the broker in TuneOK it will be used as the heartbeat interval.
- * If not a warning will be printed and the max value specified for
+ * If not a warning will be printed and the max value specified for
* heartbeat in TuneOK will be used
*/
public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
-
+
/**
* ==========================================================
@@ -100,4 +100,6 @@ public class ClientProperties
*/
public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
+
+ public static final String AMQP_VERSION = "qpid.amqp.version";
}
diff --git a/qpid/java/test-profiles/java-derby.testprofile b/qpid/java/test-profiles/java-derby.testprofile
index e7c1dc1fd6..fb339c6823 100644
--- a/qpid/java/test-profiles/java-derby.testprofile
+++ b/qpid/java/test-profiles/java-derby.testprofile
@@ -1,5 +1,5 @@
broker.language=java
-broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
+broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/derbyDB
broker.ready=BRK-1004
broker.stopped=Exception
diff --git a/qpid/java/test-profiles/java.testprofile b/qpid/java/test-profiles/java.testprofile
index 05a801ee06..a9e067e143 100644
--- a/qpid/java/test-profiles/java.testprofile
+++ b/qpid/java/test-profiles/java.testprofile
@@ -1,5 +1,5 @@
broker.language=java
-broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
+broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/derbyDB
broker.ready=BRK-1004
broker.stopped=Exception