summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java1
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java83
4 files changed, 91 insertions, 13 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 845983857c..90afd2e4ac 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -140,6 +140,12 @@ public class Main
.withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
.withLongOpt("exclude-0-10").create();
+ Option exclude0_9_1 =
+ OptionBuilder.withArgName("exclude-0-9-1").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-9-1").create();
+
+
Option exclude0_9 =
OptionBuilder.withArgName("exclude-0-9").hasArg()
.withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
@@ -179,6 +185,7 @@ public class Main
options.addOption(logwatchconfig);
options.addOption(port);
options.addOption(exclude0_10);
+ options.addOption(exclude0_9_1);
options.addOption(exclude0_9);
options.addOption(exclude0_8);
options.addOption(mport);
@@ -335,6 +342,7 @@ public class Main
Set<Integer> ports = new HashSet<Integer>();
Set<Integer> exclude_0_10 = new HashSet<Integer>();
+ Set<Integer> exclude_0_9_1 = new HashSet<Integer>();
Set<Integer> exclude_0_9 = new HashSet<Integer>();
Set<Integer> exclude_0_8 = new HashSet<Integer>();
@@ -343,6 +351,7 @@ public class Main
parsePortList(ports, serverConfig.getPorts());
parsePortList(exclude_0_10, serverConfig.getPortExclude010());
+ parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
parsePortList(exclude_0_9, serverConfig.getPortExclude09());
parsePortList(exclude_0_8, serverConfig.getPortExclude08());
@@ -351,6 +360,7 @@ public class Main
{
parsePortArray(ports, portStr);
parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10"));
+ parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1"));
parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9"));
parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8"));
@@ -399,6 +409,11 @@ public class Main
{
supported.remove(VERSION.v0_10);
}
+
+ if(exclude_0_9_1.contains(port))
+ {
+ supported.remove(VERSION.v0_9_1);
+ }
if(exclude_0_9.contains(port))
{
supported.remove(VERSION.v0_9);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 66a7279134..879eb7c9e6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -538,6 +538,11 @@ public class ServerConfiguration implements SignalHandler
return getConfig().getList("connector.non010port", Collections.EMPTY_LIST);
}
+ public List getPortExclude091()
+ {
+ return getConfig().getList("connector.non091port", Collections.EMPTY_LIST);
+ }
+
public List getPortExclude09()
{
return getConfig().getList("connector.non09port", Collections.EMPTY_LIST);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
index 3a94160e22..dbefeb61f2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
@@ -45,7 +45,6 @@ public class ProtocolOutputConverterRegistry
register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_91, org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory());
-
}
private static void register(ProtocolVersion version, Factory converter)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 78e21a8f14..9a1c6c9418 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -135,7 +135,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
(byte) 'M',
(byte) 'Q',
(byte) 'P',
- (byte) 1,
+ (byte) 0,
(byte) 0,
(byte) 9,
(byte) 1
@@ -250,6 +250,59 @@ private static final byte[] AMQP_0_9_1_HEADER =
new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
+ private class ClosedDelegateProtocolEngine implements ProtocolEngine
+ {
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public long getReadBytes()
+ {
+ return 0;
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ _logger.error("Error processing incoming data, could not negotiate a common protocol");
+ }
+
+ public void exception(Throwable t)
+ {
+ _logger.error("Error establishing session", t);
+ }
+
+ public void closed()
+ {
+
+ }
+
+ public void writerIdle()
+ {
+
+ }
+
+ public void readerIdle()
+ {
+
+ }
+ }
+
private class SelfDelegateProtocolEngine implements ProtocolEngine
{
@@ -303,12 +356,14 @@ private static final byte[] AMQP_0_9_1_HEADER =
ProtocolEngine newDelegate = null;
+ byte[] newestSupported = null;
for(int i = 0; newDelegate == null && i < _creators.length; i++)
{
if(_supported.contains(_creators[i].getVersion()))
{
+ newestSupported = _creators[i].getHeaderIdentifier();
byte[] compareBytes = _creators[i].getHeaderIdentifier();
boolean equal = true;
for(int j = 0; equal && j<compareBytes.length; j++)
@@ -319,24 +374,28 @@ private static final byte[] AMQP_0_9_1_HEADER =
{
newDelegate = _creators[i].getProtocolEngine();
}
-
-
}
}
- // let the first delegate handle completely unknown versions
+
+ // If no delegate is found then send back the most recent support protocol version id
if(newDelegate == null)
{
- newDelegate = _creators[0].getProtocolEngine();
+ _networkDriver.send(ByteBuffer.wrap(newestSupported));
+
+ newDelegate = new ClosedDelegateProtocolEngine();
}
- newDelegate.setNetworkDriver(_networkDriver);
+ else
+ {
+ newDelegate.setNetworkDriver(_networkDriver);
- _delegate = newDelegate;
+ _delegate = newDelegate;
- _header.flip();
- _delegate.received(_header);
- if(msg.hasRemaining())
- {
- _delegate.received(msg);
+ _header.flip();
+ _delegate.received(_header);
+ if(msg.hasRemaining())
+ {
+ _delegate.received(msg);
+ }
}
}