summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java50
1 files changed, 33 insertions, 17 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 3ac17e9204..ac21fe4243 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -20,12 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.qpid.AMQException;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -53,13 +51,16 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
_protocolMajor = protocolMajor;
_protocolMinor = protocolMinor;
}
-
+
public ProtocolInitiation(ProtocolVersion pv)
{
- this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
+ this(AMQP_HEADER,
+ pv.equals(ProtocolVersion.v0_91) ? 0 : CURRENT_PROTOCOL_CLASS,
+ pv.equals(ProtocolVersion.v0_91) ? 0 : TCP_PROTOCOL_INSTANCE,
+ pv.equals(ProtocolVersion.v0_91) ? 9 : pv.getMajorVersion(),
+ pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
}
-
public ProtocolInitiation(ByteBuffer in)
{
_protocolHeader = new byte[4];
@@ -71,6 +72,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
_protocolMinor = in.get();
}
+ public void writePayload(org.apache.mina.common.ByteBuffer buffer)
+ {
+ writePayload(buffer.buf());
+ }
+
public long getSize()
{
return 4 + 1 + 1 + 1 + 1;
@@ -122,21 +128,15 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
{
/**
*
- * @param session the session
* @param in input buffer
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
- public boolean decodable(IoSession session, ByteBuffer in)
+ public boolean decodable(ByteBuffer in)
{
return (in.remaining() >= 8);
}
- public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
- {
- ProtocolInitiation pi = new ProtocolInitiation(in);
- out.write(pi);
- }
}
public ProtocolVersion checkVersion() throws AMQException
@@ -160,18 +160,33 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
}
}
}
- if (_protocolClass != CURRENT_PROTOCOL_CLASS)
+
+ ProtocolVersion pv;
+
+ // Hack for 0-9-1 which changed how the header was defined
+ if(_protocolInstance == 0 && _protocolMajor == 9 && _protocolMinor == 1)
+ {
+ pv = ProtocolVersion.v0_91;
+ if (_protocolClass != 0)
+ {
+ throw new AMQProtocolClassException("Protocol class " + 0 + " was expected; received " +
+ _protocolClass, null);
+ }
+ }
+ else if (_protocolClass != CURRENT_PROTOCOL_CLASS)
{
throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
_protocolClass, null);
}
- if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
+ else if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
{
throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " +
_protocolInstance, null);
}
-
- ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
+ else
+ {
+ pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
+ }
if (!pv.isSupported())
@@ -192,4 +207,5 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
buffer.append(Integer.toHexString(_protocolMinor));
return buffer.toString();
}
+
}