diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java | 50 |
1 files changed, 33 insertions, 17 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 3ac17e9204..ac21fe4243 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.AMQException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -53,13 +51,16 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMajor = protocolMajor; _protocolMinor = protocolMinor; } - + public ProtocolInitiation(ProtocolVersion pv) { - this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); + this(AMQP_HEADER, + pv.equals(ProtocolVersion.v0_91) ? 0 : CURRENT_PROTOCOL_CLASS, + pv.equals(ProtocolVersion.v0_91) ? 0 : TCP_PROTOCOL_INSTANCE, + pv.equals(ProtocolVersion.v0_91) ? 9 : pv.getMajorVersion(), + pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion()); } - public ProtocolInitiation(ByteBuffer in) { _protocolHeader = new byte[4]; @@ -71,6 +72,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMinor = in.get(); } + public void writePayload(org.apache.mina.common.ByteBuffer buffer) + { + writePayload(buffer.buf()); + } + public long getSize() { return 4 + 1 + 1 + 1 + 1; @@ -122,21 +128,15 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData { /** * - * @param session the session * @param in input buffer * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(IoSession session, ByteBuffer in) + public boolean decodable(ByteBuffer in) { return (in.remaining() >= 8); } - public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) - { - ProtocolInitiation pi = new ProtocolInitiation(in); - out.write(pi); - } } public ProtocolVersion checkVersion() throws AMQException @@ -160,18 +160,33 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData } } } - if (_protocolClass != CURRENT_PROTOCOL_CLASS) + + ProtocolVersion pv; + + // Hack for 0-9-1 which changed how the header was defined + if(_protocolInstance == 0 && _protocolMajor == 9 && _protocolMinor == 1) + { + pv = ProtocolVersion.v0_91; + if (_protocolClass != 0) + { + throw new AMQProtocolClassException("Protocol class " + 0 + " was expected; received " + + _protocolClass, null); + } + } + else if (_protocolClass != CURRENT_PROTOCOL_CLASS) { throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + _protocolClass, null); } - if (_protocolInstance != TCP_PROTOCOL_INSTANCE) + else if (_protocolInstance != TCP_PROTOCOL_INSTANCE) { throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " + _protocolInstance, null); } - - ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor); + else + { + pv = new ProtocolVersion(_protocolMajor, _protocolMinor); + } if (!pv.isSupported()) @@ -192,4 +207,5 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData buffer.append(Integer.toHexString(_protocolMinor)); return buffer.toString(); } + } |