summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java83
1 files changed, 71 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 78e21a8f14..9a1c6c9418 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -135,7 +135,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
(byte) 'M',
(byte) 'Q',
(byte) 'P',
- (byte) 1,
+ (byte) 0,
(byte) 0,
(byte) 9,
(byte) 1
@@ -250,6 +250,59 @@ private static final byte[] AMQP_0_9_1_HEADER =
new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
+ private class ClosedDelegateProtocolEngine implements ProtocolEngine
+ {
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public long getReadBytes()
+ {
+ return 0;
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ _logger.error("Error processing incoming data, could not negotiate a common protocol");
+ }
+
+ public void exception(Throwable t)
+ {
+ _logger.error("Error establishing session", t);
+ }
+
+ public void closed()
+ {
+
+ }
+
+ public void writerIdle()
+ {
+
+ }
+
+ public void readerIdle()
+ {
+
+ }
+ }
+
private class SelfDelegateProtocolEngine implements ProtocolEngine
{
@@ -303,12 +356,14 @@ private static final byte[] AMQP_0_9_1_HEADER =
ProtocolEngine newDelegate = null;
+ byte[] newestSupported = null;
for(int i = 0; newDelegate == null && i < _creators.length; i++)
{
if(_supported.contains(_creators[i].getVersion()))
{
+ newestSupported = _creators[i].getHeaderIdentifier();
byte[] compareBytes = _creators[i].getHeaderIdentifier();
boolean equal = true;
for(int j = 0; equal && j<compareBytes.length; j++)
@@ -319,24 +374,28 @@ private static final byte[] AMQP_0_9_1_HEADER =
{
newDelegate = _creators[i].getProtocolEngine();
}
-
-
}
}
- // let the first delegate handle completely unknown versions
+
+ // If no delegate is found then send back the most recent support protocol version id
if(newDelegate == null)
{
- newDelegate = _creators[0].getProtocolEngine();
+ _networkDriver.send(ByteBuffer.wrap(newestSupported));
+
+ newDelegate = new ClosedDelegateProtocolEngine();
}
- newDelegate.setNetworkDriver(_networkDriver);
+ else
+ {
+ newDelegate.setNetworkDriver(_networkDriver);
- _delegate = newDelegate;
+ _delegate = newDelegate;
- _header.flip();
- _delegate.received(_header);
- if(msg.hasRemaining())
- {
- _delegate.received(msg);
+ _header.flip();
+ _delegate.received(_header);
+ if(msg.hasRemaining())
+ {
+ _delegate.received(msg);
+ }
}
}