diff options
Diffstat (limited to 'java/broker/src')
4 files changed, 91 insertions, 13 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 845983857c..90afd2e4ac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -140,6 +140,12 @@ public class Main .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line") .withLongOpt("exclude-0-10").create(); + Option exclude0_9_1 = + OptionBuilder.withArgName("exclude-0-9-1").hasArg() + .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line") + .withLongOpt("exclude-0-9-1").create(); + + Option exclude0_9 = OptionBuilder.withArgName("exclude-0-9").hasArg() .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line") @@ -179,6 +185,7 @@ public class Main options.addOption(logwatchconfig); options.addOption(port); options.addOption(exclude0_10); + options.addOption(exclude0_9_1); options.addOption(exclude0_9); options.addOption(exclude0_8); options.addOption(mport); @@ -335,6 +342,7 @@ public class Main Set<Integer> ports = new HashSet<Integer>(); Set<Integer> exclude_0_10 = new HashSet<Integer>(); + Set<Integer> exclude_0_9_1 = new HashSet<Integer>(); Set<Integer> exclude_0_9 = new HashSet<Integer>(); Set<Integer> exclude_0_8 = new HashSet<Integer>(); @@ -343,6 +351,7 @@ public class Main parsePortList(ports, serverConfig.getPorts()); parsePortList(exclude_0_10, serverConfig.getPortExclude010()); + parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); parsePortList(exclude_0_9, serverConfig.getPortExclude09()); parsePortList(exclude_0_8, serverConfig.getPortExclude08()); @@ -351,6 +360,7 @@ public class Main { parsePortArray(ports, portStr); parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10")); + parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1")); parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9")); parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8")); @@ -399,6 +409,11 @@ public class Main { supported.remove(VERSION.v0_10); } + + if(exclude_0_9_1.contains(port)) + { + supported.remove(VERSION.v0_9_1); + } if(exclude_0_9.contains(port)) { supported.remove(VERSION.v0_9); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 66a7279134..879eb7c9e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -538,6 +538,11 @@ public class ServerConfiguration implements SignalHandler return getConfig().getList("connector.non010port", Collections.EMPTY_LIST); } + public List getPortExclude091() + { + return getConfig().getList("connector.non091port", Collections.EMPTY_LIST); + } + public List getPortExclude09() { return getConfig().getList("connector.non09port", Collections.EMPTY_LIST); diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java index 3a94160e22..dbefeb61f2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java @@ -45,7 +45,6 @@ public class ProtocolOutputConverterRegistry register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_91, org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory());
-
}
private static void register(ProtocolVersion version, Factory converter)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 78e21a8f14..9a1c6c9418 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -135,7 +135,7 @@ private static final byte[] AMQP_0_9_1_HEADER = (byte) 'M', (byte) 'Q', (byte) 'P', - (byte) 1, + (byte) 0, (byte) 0, (byte) 9, (byte) 1 @@ -250,6 +250,59 @@ private static final byte[] AMQP_0_9_1_HEADER = new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; + private class ClosedDelegateProtocolEngine implements ProtocolEngine + { + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getWrittenBytes() + { + return 0; + } + + public long getReadBytes() + { + return 0; + } + + public void received(ByteBuffer msg) + { + _logger.error("Error processing incoming data, could not negotiate a common protocol"); + } + + public void exception(Throwable t) + { + _logger.error("Error establishing session", t); + } + + public void closed() + { + + } + + public void writerIdle() + { + + } + + public void readerIdle() + { + + } + } + private class SelfDelegateProtocolEngine implements ProtocolEngine { @@ -303,12 +356,14 @@ private static final byte[] AMQP_0_9_1_HEADER = ProtocolEngine newDelegate = null; + byte[] newestSupported = null; for(int i = 0; newDelegate == null && i < _creators.length; i++) { if(_supported.contains(_creators[i].getVersion())) { + newestSupported = _creators[i].getHeaderIdentifier(); byte[] compareBytes = _creators[i].getHeaderIdentifier(); boolean equal = true; for(int j = 0; equal && j<compareBytes.length; j++) @@ -319,24 +374,28 @@ private static final byte[] AMQP_0_9_1_HEADER = { newDelegate = _creators[i].getProtocolEngine(); } - - } } - // let the first delegate handle completely unknown versions + + // If no delegate is found then send back the most recent support protocol version id if(newDelegate == null) { - newDelegate = _creators[0].getProtocolEngine(); + _networkDriver.send(ByteBuffer.wrap(newestSupported)); + + newDelegate = new ClosedDelegateProtocolEngine(); } - newDelegate.setNetworkDriver(_networkDriver); + else + { + newDelegate.setNetworkDriver(_networkDriver); - _delegate = newDelegate; + _delegate = newDelegate; - _header.flip(); - _delegate.received(_header); - if(msg.hasRemaining()) - { - _delegate.received(msg); + _header.flip(); + _delegate.received(_header); + if(msg.hasRemaining()) + { + _delegate.received(msg); + } } } |