diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-12-18 16:23:19 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-12-18 16:23:19 +0000 |
commit | 198b43a1d3d58356949029eb64995711a1026c9b (patch) | |
tree | c387256a9ccf5d5765b2a1555b7b502c89308962 | |
parent | 8a0e1af291ef3ae3361ec91e9f09637ab1f0627b (diff) | |
download | qpid-python-198b43a1d3d58356949029eb64995711a1026c9b.tar.gz |
QPID-2273 : Fix Protocol Negotiation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@892301 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 156 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 845983857c..90afd2e4ac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -140,6 +140,12 @@ public class Main .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line") .withLongOpt("exclude-0-10").create(); + Option exclude0_9_1 = + OptionBuilder.withArgName("exclude-0-9-1").hasArg() + .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line") + .withLongOpt("exclude-0-9-1").create(); + + Option exclude0_9 = OptionBuilder.withArgName("exclude-0-9").hasArg() .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line") @@ -179,6 +185,7 @@ public class Main options.addOption(logwatchconfig); options.addOption(port); options.addOption(exclude0_10); + options.addOption(exclude0_9_1); options.addOption(exclude0_9); options.addOption(exclude0_8); options.addOption(mport); @@ -335,6 +342,7 @@ public class Main Set<Integer> ports = new HashSet<Integer>(); Set<Integer> exclude_0_10 = new HashSet<Integer>(); + Set<Integer> exclude_0_9_1 = new HashSet<Integer>(); Set<Integer> exclude_0_9 = new HashSet<Integer>(); Set<Integer> exclude_0_8 = new HashSet<Integer>(); @@ -343,6 +351,7 @@ public class Main parsePortList(ports, serverConfig.getPorts()); parsePortList(exclude_0_10, serverConfig.getPortExclude010()); + parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); parsePortList(exclude_0_9, serverConfig.getPortExclude09()); parsePortList(exclude_0_8, serverConfig.getPortExclude08()); @@ -351,6 +360,7 @@ public class Main { parsePortArray(ports, portStr); parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10")); + parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1")); parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9")); parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8")); @@ -399,6 +409,11 @@ public class Main { supported.remove(VERSION.v0_10); } + + if(exclude_0_9_1.contains(port)) + { + supported.remove(VERSION.v0_9_1); + } if(exclude_0_9.contains(port)) { supported.remove(VERSION.v0_9); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 66a7279134..879eb7c9e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -538,6 +538,11 @@ public class ServerConfiguration implements SignalHandler return getConfig().getList("connector.non010port", Collections.EMPTY_LIST); } + public List getPortExclude091() + { + return getConfig().getList("connector.non091port", Collections.EMPTY_LIST); + } + public List getPortExclude09() { return getConfig().getList("connector.non09port", Collections.EMPTY_LIST); diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java index 3a94160e22..dbefeb61f2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java @@ -45,7 +45,6 @@ public class ProtocolOutputConverterRegistry register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_91, org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory());
-
}
private static void register(ProtocolVersion version, Factory converter)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 78e21a8f14..9a1c6c9418 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -135,7 +135,7 @@ private static final byte[] AMQP_0_9_1_HEADER = (byte) 'M', (byte) 'Q', (byte) 'P', - (byte) 1, + (byte) 0, (byte) 0, (byte) 9, (byte) 1 @@ -250,6 +250,59 @@ private static final byte[] AMQP_0_9_1_HEADER = new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; + private class ClosedDelegateProtocolEngine implements ProtocolEngine + { + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getWrittenBytes() + { + return 0; + } + + public long getReadBytes() + { + return 0; + } + + public void received(ByteBuffer msg) + { + _logger.error("Error processing incoming data, could not negotiate a common protocol"); + } + + public void exception(Throwable t) + { + _logger.error("Error establishing session", t); + } + + public void closed() + { + + } + + public void writerIdle() + { + + } + + public void readerIdle() + { + + } + } + private class SelfDelegateProtocolEngine implements ProtocolEngine { @@ -303,12 +356,14 @@ private static final byte[] AMQP_0_9_1_HEADER = ProtocolEngine newDelegate = null; + byte[] newestSupported = null; for(int i = 0; newDelegate == null && i < _creators.length; i++) { if(_supported.contains(_creators[i].getVersion())) { + newestSupported = _creators[i].getHeaderIdentifier(); byte[] compareBytes = _creators[i].getHeaderIdentifier(); boolean equal = true; for(int j = 0; equal && j<compareBytes.length; j++) @@ -319,24 +374,28 @@ private static final byte[] AMQP_0_9_1_HEADER = { newDelegate = _creators[i].getProtocolEngine(); } - - } } - // let the first delegate handle completely unknown versions + + // If no delegate is found then send back the most recent support protocol version id if(newDelegate == null) { - newDelegate = _creators[0].getProtocolEngine(); + _networkDriver.send(ByteBuffer.wrap(newestSupported)); + + newDelegate = new ClosedDelegateProtocolEngine(); } - newDelegate.setNetworkDriver(_networkDriver); + else + { + newDelegate.setNetworkDriver(_networkDriver); - _delegate = newDelegate; + _delegate = newDelegate; - _header.flip(); - _delegate.received(_header); - if(msg.hasRemaining()) - { - _delegate.received(msg); + _header.flip(); + _delegate.received(_header); + if(msg.hasRemaining()) + { + _delegate.received(msg); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6dfb70fe28..0b9be5951f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -308,7 +308,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; - private ProtocolVersion _protocolVersion = ProtocolVersion.v0_91; // FIXME TGM, shouldn't need this protected AMQConnectionDelegate _delegate; @@ -458,9 +457,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); - if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion) || "0-9".equals(amqpVersion)) + if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); + } + else if ("0-9".equals(amqpVersion)) + { + _delegate = new AMQConnectionDelegate_0_9(this); + } + else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion)) + { + _delegate = new AMQConnectionDelegate_9_1(this); } else { @@ -1541,13 +1548,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ProtocolVersion getProtocolVersion() { - return _protocolVersion; - } - - public void setProtocolVersion(ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion); + return _delegate.getProtocolVersion(); } public boolean isFailingOver() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index e6c3473cb1..23dc244dee 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -61,4 +61,6 @@ public interface AMQConnectionDelegate void setIdleTimeout(long l); int getMaxChannelID(); + + ProtocolVersion getProtocolVersion(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 4d10180667..af21eb7ed0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -301,4 +301,9 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { return Integer.MAX_VALUE; } + + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.v0_10; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java index d95e2e3dff..70ecedfd8b 100755 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.ProtocolVersion; + public class AMQConnectionDelegate_0_9 extends AMQConnectionDelegate_8_0 { @@ -28,5 +30,11 @@ public class AMQConnectionDelegate_0_9 extends AMQConnectionDelegate_8_0 { super(conn); } + + @Override + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.v0_9; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index e1d9ae735c..6f44f68b37 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -107,9 +107,13 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { _conn._failoverPolicy.attainedConnection(); _conn._connected = true; + return null; + } + else + { + return _conn._protocolHandler.getSuggestedProtocolVersion(); } - return null; } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) @@ -306,4 +310,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return (int) (Math.pow(2, 16)-1); } + + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.v8_0; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java index 1bb93f66a3..442dd7b286 100755 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.ProtocolVersion; + public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0 { @@ -29,4 +31,9 @@ public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0 super(conn); } + @Override + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.v0_91; + } }
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 505febd42c..a567c2c215 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -171,6 +171,7 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); private NetworkDriver _networkDriver; + private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; private long _readBytes; @@ -427,6 +428,7 @@ public class AMQProtocolHandler implements ProtocolEngine Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { + public void run() { // Decode buffer @@ -467,9 +469,8 @@ public class AMQProtocolHandler implements ProtocolEngine // suggesting an alternate ProtocolVersion; the server will then close the // connection. ProtocolInitiation protocolInit = (ProtocolInitiation) message; - ProtocolVersion pv = protocolInit.checkVersion(); - getConnection().setProtocolVersion(pv); - + _suggestedProtocolVersion = protocolInit.checkVersion(); + // get round a bug in old versions of qpid whereby the connection is not closed _stateManager.changeState(AMQState.CONNECTION_CLOSED); } @@ -845,4 +846,10 @@ public class AMQProtocolHandler implements ProtocolEngine { return _networkDriver; } + + public ProtocolVersion getSuggestedProtocolVersion() + { + return _suggestedProtocolVersion; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java index fa0c1e9c63..00ea55ff96 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java @@ -39,13 +39,15 @@ public final class ProtocolHeader implements NetworkEvent, ProtocolEvent private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; private static final byte CLASS = 1; + final private byte protoClass; final private byte instance; final private byte major; final private byte minor; private int channel; - public ProtocolHeader(byte instance, byte major, byte minor) + public ProtocolHeader(byte protoClass, byte instance, byte major, byte minor) { + this.protoClass = protoClass; this.instance = instance; this.major = major; this.minor = minor; @@ -53,7 +55,7 @@ public final class ProtocolHeader implements NetworkEvent, ProtocolEvent public ProtocolHeader(int instance, int major, int minor) { - this((byte) instance, (byte) major, (byte) minor); + this(CLASS, (byte) instance, (byte) major, (byte) minor); } public byte getInstance() @@ -90,7 +92,7 @@ public final class ProtocolHeader implements NetworkEvent, ProtocolEvent { ByteBuffer buf = ByteBuffer.allocate(8); buf.put(AMQP); - buf.put(CLASS); + buf.put(protoClass); buf.put(instance); buf.put(major); buf.put(minor); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index 2132fc2c03..a2885f97bc 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -144,10 +144,11 @@ public class InputHandler implements Receiver<ByteBuffer> return ERROR; } + byte protoClass = input.get(pos + 4); byte instance = input.get(pos + 5); byte major = input.get(pos + 6); byte minor = input.get(pos + 7); - receiver.received(new ProtocolHeader(instance, major, minor)); + receiver.received(new ProtocolHeader(protoClass, instance, major, minor)); needed = Frame.HEADER_SIZE; return FRAME_HDR; case FRAME_HDR: diff --git a/java/common/templates/model/ProtocolVersionListClass.vm b/java/common/templates/model/ProtocolVersionListClass.vm index 110342647e..78605c70ff 100644 --- a/java/common/templates/model/ProtocolVersionListClass.vm +++ b/java/common/templates/model/ProtocolVersionListClass.vm @@ -149,15 +149,20 @@ public class ProtocolVersion implements Comparable private static final ProtocolVersion _defaultVersion; + public static final ProtocolVersion v0_10 = new ProtocolVersion((byte)0,(byte)10); + #foreach( $version in $model.getVersionSet() ) #set( $versionId = "v$version.getMajor()_$version.getMinor()" ) - public static final ProtocolVersion $versionId = new ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor()); + public static final ProtocolVersion $versionId = new ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor()); #end + static { SortedSet<ProtocolVersion> versions = new TreeSet<ProtocolVersion>(); + versions.add(v0_10); + _nameToVersionMap.put("0-10", v0_10); #foreach( $version in $model.getVersionSet() ) #set( $versionId = "v$version.getMajor()_$version.getMinor()" ) versions.add($versionId); |