diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java | 83 |
1 files changed, 71 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 78e21a8f14..9a1c6c9418 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -135,7 +135,7 @@ private static final byte[] AMQP_0_9_1_HEADER = (byte) 'M', (byte) 'Q', (byte) 'P', - (byte) 1, + (byte) 0, (byte) 0, (byte) 9, (byte) 1 @@ -250,6 +250,59 @@ private static final byte[] AMQP_0_9_1_HEADER = new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; + private class ClosedDelegateProtocolEngine implements ProtocolEngine + { + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getWrittenBytes() + { + return 0; + } + + public long getReadBytes() + { + return 0; + } + + public void received(ByteBuffer msg) + { + _logger.error("Error processing incoming data, could not negotiate a common protocol"); + } + + public void exception(Throwable t) + { + _logger.error("Error establishing session", t); + } + + public void closed() + { + + } + + public void writerIdle() + { + + } + + public void readerIdle() + { + + } + } + private class SelfDelegateProtocolEngine implements ProtocolEngine { @@ -303,12 +356,14 @@ private static final byte[] AMQP_0_9_1_HEADER = ProtocolEngine newDelegate = null; + byte[] newestSupported = null; for(int i = 0; newDelegate == null && i < _creators.length; i++) { if(_supported.contains(_creators[i].getVersion())) { + newestSupported = _creators[i].getHeaderIdentifier(); byte[] compareBytes = _creators[i].getHeaderIdentifier(); boolean equal = true; for(int j = 0; equal && j<compareBytes.length; j++) @@ -319,24 +374,28 @@ private static final byte[] AMQP_0_9_1_HEADER = { newDelegate = _creators[i].getProtocolEngine(); } - - } } - // let the first delegate handle completely unknown versions + + // If no delegate is found then send back the most recent support protocol version id if(newDelegate == null) { - newDelegate = _creators[0].getProtocolEngine(); + _networkDriver.send(ByteBuffer.wrap(newestSupported)); + + newDelegate = new ClosedDelegateProtocolEngine(); } - newDelegate.setNetworkDriver(_networkDriver); + else + { + newDelegate.setNetworkDriver(_networkDriver); - _delegate = newDelegate; + _delegate = newDelegate; - _header.flip(); - _delegate.received(_header); - if(msg.hasRemaining()) - { - _delegate.received(msg); + _header.flip(); + _delegate.received(_header); + if(msg.hasRemaining()) + { + _delegate.received(msg); + } } } |