summaryrefslogtreecommitdiff
path: root/java/common/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-03-22 13:14:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-03-22 13:14:42 +0000
commit3fb9be28593263e12623ce09084a230b59b81f4f (patch)
tree8de74dd781802819df0ff1ca56aaa94fa1b9b38e /java/common/src/main
parentb9f9c16645933e0e2f4c6c9b58e8cd1716434467 (diff)
downloadqpid-python-3fb9be28593263e12623ce09084a230b59b81f4f.tar.gz
made a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/java.multi_version@521253 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src/main')
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionException.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/ClientProperties.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQBody.java21
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java133
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java102
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java90
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java125
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java65
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java68
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java35
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java155
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java160
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodFactory_8_0.java117
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java5
35 files changed, 946 insertions, 269 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
index d8c9b287bd..12120bd10d 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -23,6 +23,8 @@ package org.apache.qpid;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl;
+import org.apache.qpid.framing.amqp_8_0.ChannelCloseBodyImpl;
import org.apache.qpid.protocol.AMQConstant;
public class AMQChannelException extends AMQException
@@ -53,6 +55,6 @@ public class AMQChannelException extends AMQException
public AMQFrame getCloseFrame(int channel)
{
- return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage()));
+ return new AMQFrame(channel, new ChannelCloseBodyImpl(getErrorCode().getCode(), new AMQShortString(getMessage()),0,0));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
index c4f80191a3..094e26802d 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
@@ -24,6 +24,7 @@ package org.apache.qpid;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl;
import org.apache.qpid.protocol.AMQConstant;
public class AMQConnectionException extends AMQException
@@ -57,7 +58,7 @@ public class AMQConnectionException extends AMQException
public AMQFrame getCloseFrame(int channel)
{
- return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage()));
+ return new AMQFrame(channel, new ConnectionCloseBodyImpl(getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId));
}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index bb981a242f..6e0a5c3786 100644
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -48,13 +48,21 @@ public class AMQDecoder extends CumulativeProtocolDecoder
protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
- if (_expectProtocolInitiation)
+ try
{
- return doDecodePI(session, in, out);
+ if (_expectProtocolInitiation)
+ {
+ return doDecodePI(session, in, out);
+ }
+ else
+ {
+ return doDecodeDataBlock(session, in, out);
+ }
}
- else
+ catch (Exception e)
{
- return doDecodeDataBlock(session, in, out);
+ e.printStackTrace();
+ throw e;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
index 07371b5182..1f1911aa35 100644
--- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
@@ -20,10 +20,28 @@
*/
package org.apache.qpid.common;
+import org.apache.qpid.framing.AMQShortString;
+
public enum ClientProperties
{
+
+
instance,
product,
version,
- platform
+ platform;
+
+
+ private final AMQShortString _name;
+
+ private ClientProperties()
+ {
+ _name = new AMQShortString(toString());
+ }
+
+ public AMQShortString getName()
+ {
+ return _name;
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
index ebeea8d2b4..4dd5ab7a9a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,18 +22,15 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-public abstract class AMQBody
+public interface AMQBody
{
- public abstract byte getFrameType();
-
- /**
+ byte getFrameType();
+
+ /**
* Get the size of the body
* @return unsigned short
*/
- protected abstract int getSize();
-
- protected abstract void writePayload(ByteBuffer buffer);
-
- protected abstract void populateFromBuffer(ByteBuffer buffer, long size)
- throws AMQFrameDecodingException, AMQProtocolVersionException;
+ int getSize();
+
+ void writePayload(ByteBuffer buffer);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java
new file mode 100644
index 0000000000..6b2d1feae5
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+
+public abstract class AMQBodyImpl implements AMQBody
+{
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 43f888c029..2ecd4d4650 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -72,7 +72,7 @@ public class AMQDataBlockDecoder
final byte type = in.get();
BodyFactory bodyFactory;
- if(type == AMQMethodBody.TYPE)
+ if(type == AMQMethodBodyImpl.TYPE)
{
bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
if(bodyFactory == null)
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index 111d9a8f20..d61c1c3d36 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -1,132 +1,25 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQConstant;
-public abstract class AMQMethodBody extends AMQBody
+/**
+ * Created by IntelliJ IDEA.
+ * User: U146758
+ * Date: 08-Mar-2007
+ * Time: 11:30:28
+ * To change this template use File | Settings | File Templates.
+ */
+public interface AMQMethodBody extends AMQBody
{
- public static final byte TYPE = 1;
-
- /** AMQP version */
- protected byte major;
- protected byte minor;
-
- public byte getMajor()
- {
- return major;
- }
-
- public byte getMinor()
- {
- return minor;
- }
-
- public AMQMethodBody(byte major, byte minor)
- {
- this.major = major;
- this.minor = minor;
- }
-
- /** unsigned short */
- protected abstract int getBodySize();
-
- /** @return unsigned short */
- protected abstract int getClazz();
-
- /** @return unsigned short */
- protected abstract int getMethod();
-
- protected abstract void writeMethodPayload(ByteBuffer buffer);
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- protected int getSize()
- {
- return 2 + 2 + getBodySize();
- }
-
- protected void writePayload(ByteBuffer buffer)
- {
- EncodingUtils.writeUnsignedShort(buffer, getClazz());
- EncodingUtils.writeUnsignedShort(buffer, getMethod());
- writeMethodPayload(buffer);
- }
-
- protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
-
- protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
- {
- populateMethodBodyFromBuffer(buffer);
- }
-
- public String toString()
- {
- StringBuffer buf = new StringBuffer(getClass().toString());
- buf.append(" Class: ").append(getClazz());
- buf.append(" Method: ").append(getMethod());
- return buf.toString();
- }
-
- /**
- * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and
- * method ids of the body it resulted from).
- */
-
- /**
- * Convenience Method to create a channel not found exception
- *
- * @param channelId The channel id that is not found
- *
- * @return new AMQChannelException
- */
- public AMQChannelException getChannelNotFoundException(int channelId)
- {
- return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId);
- }
-
- public AMQChannelException getChannelException(AMQConstant code, String message)
- {
- return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor);
- }
+ AMQChannelException getChannelNotFoundException(int channelId);
- public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
- {
- return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause);
- }
+ AMQChannelException getChannelException(AMQConstant code, String message);
- public AMQConnectionException getConnectionException(AMQConstant code, String message)
- {
- return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
- }
+ AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause);
- public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
- {
- return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause);
- }
+ AMQConnectionException getConnectionException(AMQConstant code, String message);
+ AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
index 5293c00379..f5cd971c0e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
@@ -37,6 +37,6 @@ public class AMQMethodBodyFactory implements BodyFactory
public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);
+ return _protocolSession.getRegistry().convertToBody(in, bodySize);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
new file mode 100644
index 0000000000..1951970a72
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public abstract class AMQMethodBodyImpl extends AMQBodyImpl implements AMQMethodBody
+{
+ public static final byte TYPE = 1;
+
+
+ public abstract byte getMajor();
+ public abstract byte getMinor();
+
+ /** unsigned short */
+ protected abstract int getBodySize();
+
+ /** @return unsigned short */
+ protected abstract int getClazz();
+
+ /** @return unsigned short */
+ protected abstract int getMethod();
+
+ protected abstract void writeMethodPayload(ByteBuffer buffer);
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return 2 + 2 + getBodySize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, getClazz());
+ EncodingUtils.writeUnsignedShort(buffer, getMethod());
+ writeMethodPayload(buffer);
+ }
+
+
+ /**
+ * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and
+ * method ids of the body it resulted from).
+ */
+
+ /**
+ * Convenience Method to create a channel not found exception
+ *
+ * @param channelId The channel id that is not found
+ *
+ * @return new AMQChannelException
+ */
+ public AMQChannelException getChannelNotFoundException(int channelId)
+ {
+ return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId);
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor());
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor());
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
index cfbc9d1828..9a7868f3cd 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
@@ -5,6 +5,5 @@ import org.apache.mina.common.ByteBuffer;
public abstract interface AMQMethodBodyInstanceFactory
{
- public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
- public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java
new file mode 100644
index 0000000000..4ffc9e0066
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+
+public interface AMQMethodFactory
+{
+
+ // Connection Methods
+
+ ConnectionCloseBody createConnectionClose();
+
+ // Access Methods
+
+ AccessRequestBody createAccessRequest(boolean active, boolean exclusive, boolean passive, boolean read, AMQShortString realm, boolean write);
+
+
+ // Tx Methods
+
+ TxSelectBody createTxSelect();
+
+ TxCommitBody createTxCommit();
+
+ TxRollbackBody createTxRollback();
+
+ // Channel Methods
+
+ ChannelOpenBody createChannelOpen();
+
+ ChannelCloseBody createChannelClose(int replyCode, AMQShortString replyText);
+
+ ChannelFlowBody createChannelFlow(boolean active);
+
+
+ // Exchange Methods
+
+
+ ExchangeBoundBody createExchangeBound(AMQShortString exchangeName,
+ AMQShortString queueName,
+ AMQShortString routingKey);
+
+ ExchangeDeclareBody createExchangeDeclare(AMQShortString name, AMQShortString type, int ticket);
+
+
+ // Queue Methods
+
+ QueueDeclareBody createQueueDeclare(AMQShortString name, FieldTable arguments, boolean autoDelete, boolean durable, boolean exclusive, boolean passive, int ticket);
+
+ QueueBindBody createQueueBind(AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments, int ticket);
+
+ QueueDeleteBody createQueueDelete(AMQShortString queueName, boolean ifEmpty, boolean ifUnused, int ticket);
+
+
+ // Message Methods
+
+ // In different versions of the protocol we change the class used for message transfer
+ // abstract this out so the appropriate methods are created
+ AMQMethodBody createRecover(boolean requeue);
+
+ AMQMethodBody createConsumer(AMQShortString tag, AMQShortString queueName, FieldTable arguments, boolean noAck, boolean exclusive, boolean noLocal, int ticket);
+
+ AMQMethodBody createConsumerCancel(AMQShortString consumerTag);
+
+ AMQMethodBody createAcknowledge(long deliveryTag, boolean multiple);
+
+ AMQMethodBody createRejectBody(long deliveryTag, boolean requeue);
+
+ AMQMethodBody createMessageQos(int prefetchCount, int prefetchSize);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index 1045b02868..8b784fa3f7 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
-public class BasicContentHeaderProperties implements ContentHeaderProperties
+public class BasicContentHeaderProperties implements CommonContentHeaderProperties
{
private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class);
@@ -421,14 +421,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
}
}
- public AMQShortString getContentTypeShortString()
+ public AMQShortString getContentType()
{
decodeContentTypeIfNecessary();
return _contentType;
}
- public String getContentType()
+ public String getContentTypeAsString()
{
decodeContentTypeIfNecessary();
return _contentType == null ? null : _contentType.toString();
@@ -444,15 +444,19 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
public void setContentType(String contentType)
{
- clearEncodedForm();
- _propertyFlags |= (1 << 15);
- _contentType = contentType == null ? null : new AMQShortString(contentType);
+ setContentType(contentType == null ? null : new AMQShortString(contentType));
+ }
+
+ public String getEncodingAsString()
+ {
+
+ return getEncoding() == null ? null : getEncoding().toString();
}
- public String getEncoding()
+ public AMQShortString getEncoding()
{
decodeIfNecessary();
- return _encoding == null ? null : _encoding.toString();
+ return _encoding;
}
public void setEncoding(String encoding)
@@ -462,6 +466,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
_encoding = encoding == null ? null : new AMQShortString(encoding);
}
+ public void setEncoding(AMQShortString encoding)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 14);
+ _encoding = encoding;
+ }
+
+
public FieldTable getHeaders()
{
decodeHeadersIfNecessary();
@@ -508,7 +520,13 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
_priority = priority;
}
- public String getCorrelationId()
+ public AMQShortString getCorrelationId()
+ {
+ decodeIfNecessary();
+ return _correlationId;
+ }
+
+ public String getCorrelationIdAsString()
{
decodeIfNecessary();
return _correlationId == null ? null : _correlationId.toString();
@@ -516,18 +534,23 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
public void setCorrelationId(String correlationId)
{
+ setCorrelationId(correlationId == null ? null : new AMQShortString(correlationId));
+ }
+
+ public void setCorrelationId(AMQShortString correlationId)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 10);
- _correlationId = correlationId == null ? null : new AMQShortString(correlationId);
+ _correlationId = correlationId;
}
- public String getReplyTo()
+ public String getReplyToAsString()
{
decodeIfNecessary();
return _replyTo == null ? null : _replyTo.toString();
}
- public AMQShortString getReplyToAsShortString()
+ public AMQShortString getReplyTo()
{
decodeIfNecessary();
return _replyTo;
@@ -561,7 +584,13 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
}
- public String getMessageId()
+ public AMQShortString getMessageId()
+ {
+ decodeIfNecessary();
+ return _messageId;
+ }
+
+ public String getMessageIdAsString()
{
decodeIfNecessary();
return _messageId == null ? null : _messageId.toString();
@@ -574,6 +603,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
_messageId = messageId == null ? null : new AMQShortString(messageId);
}
+ public void setMessageId(AMQShortString messageId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 7);
+ _messageId = messageId;
+ }
+
+
public long getTimestamp()
{
decodeIfNecessary();
@@ -587,56 +624,102 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
_timestamp = timestamp;
}
- public String getType()
+ public String getTypeAsString()
{
decodeIfNecessary();
return _type == null ? null : _type.toString();
}
+
+ public AMQShortString getType()
+ {
+ decodeIfNecessary();
+ return _type;
+ }
+
+
public void setType(String type)
{
+ setType(type == null ? null : new AMQShortString(type));
+ }
+
+ public void setType(AMQShortString type)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 5);
- _type = type == null ? null : new AMQShortString(type);
+ _type = type;
}
- public String getUserId()
+ public String getUserIdAsString()
{
decodeIfNecessary();
return _userId == null ? null : _userId.toString();
}
+ public AMQShortString getUserId()
+ {
+ decodeIfNecessary();
+ return _userId;
+ }
+
public void setUserId(String userId)
{
+ setUserId(userId == null ? null : new AMQShortString(userId));
+ }
+
+ public void setUserId(AMQShortString userId)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 4);
- _userId = userId == null ? null : new AMQShortString(userId);
+ _userId = userId;
}
- public String getAppId()
+ public String getAppIdAsString()
{
decodeIfNecessary();
return _appId == null ? null : _appId.toString();
}
+ public AMQShortString getAppId()
+ {
+ decodeIfNecessary();
+ return _appId;
+ }
+
public void setAppId(String appId)
{
+ setAppId(appId == null ? null : new AMQShortString(appId));
+ }
+
+ public void setAppId(AMQShortString appId)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 3);
- _appId = appId == null ? null : new AMQShortString(appId);
+ _appId = appId;
}
- public String getClusterId()
+ public String getClusterIdAsString()
{
decodeIfNecessary();
return _clusterId == null ? null : _clusterId.toString();
}
+ public AMQShortString getClusterId()
+ {
+ decodeIfNecessary();
+ return _clusterId;
+ }
+
public void setClusterId(String clusterId)
{
+ setClusterId(clusterId == null ? null : new AMQShortString(clusterId));
+ }
+
+ public void setClusterId(AMQShortString clusterId)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 2);
- _clusterId = clusterId == null ? null : new AMQShortString(clusterId);
+ _clusterId = clusterId;
}
public String toString()
diff --git a/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java
new file mode 100644
index 0000000000..1641cbf4e8
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java
@@ -0,0 +1,65 @@
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.log4j.Logger;
+
+public interface CommonContentHeaderProperties extends ContentHeaderProperties
+{
+
+ AMQShortString getContentType();
+
+ void setContentType(AMQShortString contentType);
+
+ FieldTable getHeaders();
+
+ void setHeaders(FieldTable headers);
+
+ byte getDeliveryMode();
+
+ void setDeliveryMode(byte deliveryMode);
+
+ byte getPriority();
+
+ void setPriority(byte priority);
+
+ AMQShortString getCorrelationId();
+
+ void setCorrelationId(AMQShortString correlationId);
+
+ AMQShortString getReplyTo();
+
+ void setReplyTo(AMQShortString replyTo);
+
+ long getExpiration();
+
+ void setExpiration(long expiration);
+
+ AMQShortString getMessageId();
+
+ void setMessageId(AMQShortString messageId);
+
+ long getTimestamp();
+
+ void setTimestamp(long timestamp);
+
+ AMQShortString getType();
+
+ void setType(AMQShortString type);
+
+ AMQShortString getUserId();
+
+ void setUserId(AMQShortString userId);
+
+ AMQShortString getAppId();
+
+ void setAppId(AMQShortString appId);
+
+ AMQShortString getClusterId();
+
+ void setClusterId(AMQShortString clusterId);
+
+ AMQShortString getEncoding();
+
+ void setEncoding(AMQShortString encoding);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index be38695384..a1aaab06c6 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -22,7 +22,7 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-public class ContentBody extends AMQBody
+public class ContentBody extends AMQBodyImpl
{
public static final byte TYPE = 3;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
index 5636229d53..7b6a92e691 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
@@ -39,7 +39,7 @@ public class ContentBodyFactory implements BodyFactory
_log.debug("Creating content body factory");
}
- public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
return new ContentBody(in, bodySize);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 02631a5f88..c71f47bad2 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -22,7 +22,7 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-public class ContentHeaderBody extends AMQBody
+public class ContentHeaderBody extends AMQBodyImpl
{
public static final byte TYPE = 2;
@@ -110,7 +110,7 @@ public class ContentHeaderBody extends AMQBody
properties.writePropertyListPayload(buffer);
}
- public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
+ public static AMQFrame createAMQFrame(int channelId, int classId, int weight, CommonContentHeaderProperties properties,
long bodySize)
{
return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize));
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
index 818fc9cf0c..9570ec800d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
@@ -39,7 +39,7 @@ public class ContentHeaderBodyFactory implements BodyFactory
_log.debug("Creating content header body factory");
}
- public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
// all content headers are the same - it is only the properties that differ.
// the content header body further delegates construction of properties
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
index 7dac018872..a8a8097fd2 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.framing;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+
import org.apache.mina.common.ByteBuffer;
public class ContentHeaderPropertiesFactory
@@ -43,7 +45,7 @@ public class ContentHeaderPropertiesFactory
// AMQP version change: "Hardwired" version to major=8, minor=0
// TODO: Change so that the actual version is obtained from
// the ProtocolInitiation object for this session.
- if (classId == BasicConsumeBody.getClazz((byte)8, (byte)0))
+ if (classId == BasicConsumeBodyImpl.CLASS_ID)
{
properties = new BasicContentHeaderProperties();
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 246e5ebc90..a7544c5747 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -41,10 +41,14 @@ public class FieldTable
private LinkedHashMap<AMQShortString, AMQTypedValue> _properties;
private long _encodedSize;
private static final int INITIAL_HASHMAP_CAPACITY = 16;
+ private static final int INITIAL_ENCODED_FORM_SIZE = 256;
public FieldTable()
{
super();
+// _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE);
+// _encodedForm.setAutoExpand(true);
+// _encodedForm.limit(0);
}
/**
@@ -109,11 +113,28 @@ public class FieldTable
private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val)
{
initMapIfNecessary();
- _encodedForm = null;
- if(val == null)
+ if(_properties.containsKey(key))
+ {
+ _encodedForm = null;
+
+ if(val == null)
+ {
+ return removeKey(key);
+ }
+ }
+ else if(_encodedForm != null && val != null)
+ {
+ EncodingUtils.writeShortStringBytes(_encodedForm, key);
+ val.writeToBuffer(_encodedForm);
+
+ }
+ else if (val == null)
{
- return removeKey(key);
+ return null;
}
+
+
+
AMQTypedValue oldVal = _properties.put(key,val);
if(oldVal != null)
{
@@ -134,7 +155,7 @@ public class FieldTable
{
if(_properties == null)
{
- if(_encodedForm == null)
+ if(_encodedForm == null || _encodedSize == 0)
{
_properties = new LinkedHashMap<AMQShortString,AMQTypedValue>();
}
@@ -655,6 +676,7 @@ public class FieldTable
if (trace)
{
_logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "...");
+ _logger.trace(_properties);
}
EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize());
@@ -701,6 +723,7 @@ public class FieldTable
public void addAll(FieldTable fieldTable)
{
initMapIfNecessary();
+ _encodedForm = null;
_properties.putAll(fieldTable._properties);
recalculateEncodedSize();
}
@@ -836,7 +859,13 @@ public class FieldTable
if(_encodedForm != null)
{
- buffer.put(_encodedForm);
+
+ if(_encodedForm.position() != 0)
+ {
+ _encodedForm.flip();
+ }
+// _encodedForm.limit((int)getEncodedSize());
+ buffer.put(_encodedForm);
}
else if(_properties != null)
{
@@ -924,4 +953,33 @@ public class FieldTable
}
}
+ public int hashCode()
+ {
+ initMapIfNecessary();
+ return _properties.hashCode();
+ }
+
+
+ public boolean equals(Object o)
+ {
+ if(o == this)
+ {
+ return true;
+ }
+ if(o == null)
+ {
+ return false;
+ }
+ if(!(o instanceof FieldTable))
+ {
+ return false;
+ }
+
+ initMapIfNecessary();
+
+ FieldTable f = (FieldTable) o;
+ f.initMapIfNecessary();
+
+ return _properties.equals(f._properties);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index 7246c4a1cf..17b2a2f9c2 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -22,7 +22,7 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-public class HeartbeatBody extends AMQBody
+public class HeartbeatBody extends AMQBodyImpl
{
public static final byte TYPE = 8;
public static AMQFrame FRAME = new HeartbeatBody().toFrame();
@@ -46,12 +46,12 @@ public class HeartbeatBody extends AMQBody
return TYPE;
}
- protected int getSize()
+ public int getSize()
{
return 0;//heartbeats we generate have no payload
}
- protected void writePayload(ByteBuffer buffer)
+ public void writePayload(ByteBuffer buffer)
{
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
index c7ada708dc..2249f1d1cf 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
@@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer;
public class HeartbeatBodyFactory implements BodyFactory
{
- public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
return new HeartbeatBody();
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java
new file mode 100644
index 0000000000..d75589f914
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public class MainRegistry
+{
+
+ public static VersionSpecificRegistry getVersionSpecificRegistry(byte versionMajor, byte versionMinor)
+ {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ public static VersionSpecificRegistry getVersionSpecificRegistry(ProtocolVersion pv)
+ {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java
index dd93cc97fa..f253372a65 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java
@@ -4,6 +4,7 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
import org.apache.mina.common.ByteBuffer;
@@ -19,7 +20,7 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot
}
- public AMQBody convertToBody(ContentChunk contentChunk)
+ public AMQBodyImpl convertToBody(ContentChunk contentChunk)
{
return new ContentBody(contentChunk.getData());
}
@@ -52,8 +53,8 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot
public void configure()
{
- _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion());
- _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion());
+ _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
+ _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
}
@@ -87,18 +88,15 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot
}
- public AMQMethodBody convertToBody(MessagePublishInfo info)
+ public AMQMethodBodyImpl convertToBody(MessagePublishInfo info)
{
- return new BasicPublishBody(getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- _basicPublishClassId,
- _basicPublishMethodId,
- info.getExchange(),
- info.isImmediate(),
+ return new BasicPublishBodyImpl(0, // ticket
+ info.getExchange(),
+ info.getRoutingKey(),
info.isMandatory(),
- info.getRoutingKey(),
- 0) ; // ticket
+ info.isImmediate()
+ ) ;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 697a0f4249..8b40fe72eb 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -25,25 +25,50 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.qpid.AMQException;
+import java.io.UnsupportedEncodingException;
+
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
- public char[] header = new char[]{'A','M','Q','P'};
+
// TODO: generate these constants automatically from the xml protocol spec file
+ public static final byte[] AMQP_HEADER = new byte[]{(byte)'A',(byte)'M',(byte)'Q',(byte)'P'};
- private static byte CURRENT_PROTOCOL_CLASS = 1;
- private static final int CURRENT_PROTOCOL_INSTANCE = 1;
+ private static final byte CURRENT_PROTOCOL_CLASS = 1;
+ private static final byte TCP_PROTOCOL_INSTANCE = 1;
+
+ public final byte[] _protocolHeader;
+ public final byte _protocolClass;
+ public final byte _protocolInstance;
+ public final byte _protocolMajor;
+ public final byte _protocolMinor;
- public byte protocolClass = CURRENT_PROTOCOL_CLASS;
- public byte protocolInstance = CURRENT_PROTOCOL_INSTANCE;
- public byte protocolMajor;
- public byte protocolMinor;
// public ProtocolInitiation() {}
- public ProtocolInitiation(byte major, byte minor)
+ public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor)
+ {
+ _protocolHeader = protocolHeader;
+ _protocolClass = protocolClass;
+ _protocolInstance = protocolInstance;
+ _protocolMajor = protocolMajor;
+ _protocolMinor = protocolMinor;
+ }
+
+ public ProtocolInitiation(ProtocolVersion pv)
{
- protocolMajor = major;
- protocolMinor = minor;
+ this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
+ }
+
+
+ public ProtocolInitiation(ByteBuffer in)
+ {
+ _protocolHeader = new byte[4];
+ in.get(_protocolHeader);
+
+ _protocolClass = in.get();
+ _protocolInstance = in.get();
+ _protocolMajor = in.get();
+ _protocolMinor = in.get();
}
public long getSize()
@@ -53,19 +78,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
public void writePayload(ByteBuffer buffer)
{
- for (int i = 0; i < header.length; i++)
- {
- buffer.put((byte) header[i]);
- }
- buffer.put(protocolClass);
- buffer.put(protocolInstance);
- buffer.put(protocolMajor);
- buffer.put(protocolMinor);
- }
- public void populateFromBuffer(ByteBuffer buffer) throws AMQException
- {
- throw new AMQException("Method not implemented");
+ buffer.put(_protocolHeader);
+ buffer.put(_protocolClass);
+ buffer.put(_protocolInstance);
+ buffer.put(_protocolMajor);
+ buffer.put(_protocolMinor);
}
public boolean equals(Object o)
@@ -76,36 +94,36 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
}
ProtocolInitiation pi = (ProtocolInitiation) o;
- if (pi.header == null)
+ if (pi._protocolHeader == null)
{
return false;
}
- if (header.length != pi.header.length)
+ if (_protocolHeader.length != pi._protocolHeader.length)
{
return false;
}
- for (int i = 0; i < header.length; i++)
+ for (int i = 0; i < _protocolHeader.length; i++)
{
- if (header[i] != pi.header[i])
+ if (_protocolHeader[i] != pi._protocolHeader[i])
{
return false;
}
}
- return (protocolClass == pi.protocolClass &&
- protocolInstance == pi.protocolInstance &&
- protocolMajor == pi.protocolMajor &&
- protocolMinor == pi.protocolMinor);
+ return (_protocolClass == pi._protocolClass &&
+ _protocolInstance == pi._protocolInstance &&
+ _protocolMajor == pi._protocolMajor &&
+ _protocolMinor == pi._protocolMinor);
}
public static class Decoder //implements MessageDecoder
{
/**
*
- * @param session
- * @param in
+ * @param session the session
+ * @param in input buffer
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
@@ -115,63 +133,62 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
}
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
- throws Exception
{
- byte[] theHeader = new byte[4];
- in.get(theHeader);
- ProtocolInitiation pi = new ProtocolInitiation((byte)0, (byte)0);
- pi.header = new char[]{(char) theHeader[0],(char) theHeader[CURRENT_PROTOCOL_INSTANCE],(char) theHeader[2], (char) theHeader[3]};
- String stringHeader = new String(pi.header);
- if (!"AMQP".equals(stringHeader))
- {
- throw new AMQProtocolHeaderException("Invalid protocol header - read " + stringHeader);
- }
- pi.protocolClass = in.get();
- pi.protocolInstance = in.get();
- pi.protocolMajor = in.get();
- pi.protocolMinor = in.get();
+ ProtocolInitiation pi = new ProtocolInitiation(in);
out.write(pi);
}
}
- public void checkVersion(ProtocolVersionList pvl) throws AMQException
+ public void checkVersion() throws AMQException
{
- if (protocolClass != CURRENT_PROTOCOL_CLASS)
- {
- throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
- protocolClass);
- }
- if (protocolInstance != CURRENT_PROTOCOL_INSTANCE)
+
+ if(_protocolHeader.length != 4)
{
- throw new AMQProtocolInstanceException("Protocol instance " + CURRENT_PROTOCOL_INSTANCE + " was expected; received " +
- protocolInstance);
+ throw new AMQProtocolHeaderException("Protocol header should have exactly four octets");
}
-
- /* Look through list of available protocol versions */
- boolean found = false;
- for (int i=0; i<pvl.pv.length; i++)
+ for(int i = 0; i < 4; i++)
{
- if (pvl.pv[i][pvl.PROTOCOL_MAJOR] == protocolMajor &&
- pvl.pv[i][pvl.PROTOCOL_MINOR] == protocolMinor)
+ if(_protocolHeader[i] != AMQP_HEADER[i])
{
- found = true;
+ try
+ {
+ throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+
+ }
}
}
- if (!found)
+ if (_protocolClass != CURRENT_PROTOCOL_CLASS)
+ {
+ throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
+ _protocolClass);
+ }
+ if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
+ {
+ throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " +
+ _protocolInstance);
+ }
+
+ ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
+
+
+ if (!pv.isSupported())
{
// TODO: add list of available versions in list to msg...
throw new AMQProtocolVersionException("Protocol version " +
- protocolMajor + "." + protocolMinor + " not found in protocol version list.");
+ _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.");
}
}
public String toString()
{
- StringBuffer buffer = new StringBuffer(new String(header));
- buffer.append(Integer.toHexString(protocolClass));
- buffer.append(Integer.toHexString(protocolInstance));
- buffer.append(Integer.toHexString(protocolMajor));
- buffer.append(Integer.toHexString(protocolMinor));
+ StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
+ buffer.append(Integer.toHexString(_protocolClass));
+ buffer.append(Integer.toHexString(_protocolInstance));
+ buffer.append(Integer.toHexString(_protocolMajor));
+ buffer.append(Integer.toHexString(_protocolMinor));
return buffer.toString();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
index ec371453aa..ebe0b91cf4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
@@ -179,7 +179,7 @@ public class VersionSpecificRegistry
}
- return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
+ return bodyFactory.newInstance(in, size);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
index c9e15f18e3..d5da133837 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
@@ -18,12 +18,13 @@
package org.apache.qpid.framing.abstraction;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.AMQMethodBody;
public interface MessagePublishInfoConverter
{
public MessagePublishInfo convertToInfo(AMQMethodBody body);
- public AMQMethodBody convertToBody(MessagePublishInfo info);
+ public AMQMethodBodyImpl convertToBody(MessagePublishInfo info);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
index 52e82cdf07..b8e460eb05 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
@@ -18,11 +18,12 @@
package org.apache.qpid.framing.abstraction;
+import org.apache.qpid.framing.AMQBodyImpl;
import org.apache.qpid.framing.AMQBody;
public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
{
- AMQBody convertToBody(ContentChunk contentBody);
+ AMQBodyImpl convertToBody(ContentChunk contentBody);
ContentChunk convertToContentChunk(AMQBody body);
void configure();
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java
new file mode 100644
index 0000000000..5e7a04cfc5
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java
@@ -0,0 +1,160 @@
+package org.apache.qpid.framing.amqp_8_0;
+
+import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import org.apache.mina.common.ByteBuffer;
+
+public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMethodBodyImpl
+{
+
+ public byte getMajor()
+ {
+ return 8;
+ }
+
+ public byte getMinor()
+ {
+ return 0;
+ }
+
+ public int getSize()
+ {
+ return 2 + 2 + getBodySize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, getClazz());
+ EncodingUtils.writeUnsignedShort(buffer, getMethod());
+ writeMethodPayload(buffer);
+ }
+
+
+ protected byte readByte(ByteBuffer buffer)
+ {
+ return buffer.get();
+ }
+
+ protected AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ return EncodingUtils.readAMQShortString(buffer);
+ }
+
+ protected int getSizeOf(AMQShortString string)
+ {
+ return EncodingUtils.encodedShortStringLength(string);
+ }
+
+ protected void writeByte(ByteBuffer buffer, byte b)
+ {
+ buffer.put(b);
+ }
+
+ protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, string);
+ }
+
+ protected int readInt(ByteBuffer buffer)
+ {
+ return buffer.getInt();
+ }
+
+ protected void writeInt(ByteBuffer buffer, int i)
+ {
+ buffer.putInt(i);
+ }
+
+ protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ return EncodingUtils.readFieldTable(buffer);
+ }
+
+ protected int getSizeOf(FieldTable table)
+ {
+ return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, table);
+ }
+
+ protected long readLong(ByteBuffer buffer)
+ {
+ return buffer.getLong();
+ }
+
+ protected void writeLong(ByteBuffer buffer, long l)
+ {
+ buffer.putLong(l);
+ }
+
+ protected int getSizeOf(byte[] response)
+ {
+ return response.length; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeBytes(ByteBuffer buffer, byte[] data)
+ {
+ EncodingUtils.writeLongstr(buffer,data);
+ }
+
+ protected byte[] readBytes(ByteBuffer buffer)
+ {
+ return EncodingUtils.readLongstr(buffer);
+ }
+
+ protected short readShort(ByteBuffer buffer)
+ {
+ return EncodingUtils.readShort(buffer);
+ }
+
+ protected void writeShort(ByteBuffer buffer, short s)
+ {
+ EncodingUtils.writeShort(buffer, s);
+ }
+
+ protected short readUnsignedByte(ByteBuffer buffer)
+ {
+ return buffer.getUnsigned();
+ }
+
+ protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
+ {
+ EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
+ }
+
+ protected byte readBitfield(ByteBuffer buffer)
+ {
+ return readByte(buffer);
+ }
+
+ protected int readUnsignedShort(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedShort();
+ }
+
+ protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
+ {
+ buffer.put(bitfield0);
+ }
+
+ protected void writeUnsignedShort(ByteBuffer buffer, int s)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, s);
+ }
+
+ protected long readUnsignedInteger(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedInt();
+ }
+ protected void writeUnsignedInteger(ByteBuffer buffer, long i)
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, i);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodFactory_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodFactory_8_0.java
new file mode 100644
index 0000000000..188ed07a70
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodFactory_8_0.java
@@ -0,0 +1,117 @@
+package org.apache.qpid.framing.amqp_8_0;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
+
+import org.apache.mina.common.ByteBuffer;
+
+
+public class AMQMethodFactory_8_0 implements AMQMethodFactory
+{
+ private static final AMQShortString CLIENT_INITIATED_CONNECTION_CLOSE =
+ new AMQShortString("Client initiated connection close");
+
+ public ConnectionCloseBody createConnectionClose()
+ {
+ return new ConnectionCloseBodyImpl(AMQConstant.REPLY_SUCCESS.getCode(),
+ CLIENT_INITIATED_CONNECTION_CLOSE,
+ 0,
+ 0);
+ }
+
+ public AccessRequestBody createAccessRequest(boolean active, boolean exclusive, boolean passive, boolean read, AMQShortString realm, boolean write)
+ {
+ return new AccessRequestBodyImpl(realm,exclusive,passive,active,write,read);
+ }
+
+ public TxSelectBody createTxSelect()
+ {
+ return new TxSelectBodyImpl();
+ }
+
+ public TxCommitBody createTxCommit()
+ {
+ return new TxCommitBodyImpl();
+ }
+
+ public TxRollbackBody createTxRollback()
+ {
+ return new TxRollbackBodyImpl();
+ }
+
+ public ChannelOpenBody createChannelOpen()
+ {
+ return new ChannelOpenBodyImpl((AMQShortString)null);
+ }
+
+ public ChannelCloseBody createChannelClose(int replyCode, AMQShortString replyText)
+ {
+ return new ChannelCloseBodyImpl(replyCode, replyText, 0, 0);
+ }
+
+ public ExchangeDeclareBody createExchangeDeclare(AMQShortString name, AMQShortString type, int ticket)
+ {
+ return new ExchangeDeclareBodyImpl(ticket,name,type,false,false,false,false,false,null);
+ }
+
+ public ExchangeBoundBody createExchangeBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey)
+ {
+ return new ExchangeBoundBodyImpl(exchangeName,routingKey,queueName);
+ }
+
+ public QueueDeclareBody createQueueDeclare(AMQShortString name, FieldTable arguments, boolean autoDelete, boolean durable, boolean exclusive, boolean passive, int ticket)
+ {
+ return new QueueDeclareBodyImpl(ticket,name,passive,durable,exclusive,autoDelete,false,arguments);
+ }
+
+ public QueueBindBody createQueueBind(AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments, int ticket)
+ {
+ return new QueueBindBodyImpl(ticket,queueName,exchangeName,routingKey,false,arguments);
+ }
+
+ public QueueDeleteBody createQueueDelete(AMQShortString queueName, boolean ifEmpty, boolean ifUnused, int ticket)
+ {
+ return new QueueDeleteBodyImpl(ticket,queueName,ifUnused,ifEmpty,false);
+ }
+
+ public ChannelFlowBody createChannelFlow(boolean active)
+ {
+ return new ChannelFlowBodyImpl(active);
+ }
+
+
+ // In different versions of the protocol we change the class used for message transfer
+ // abstract this out so the appropriate methods are created
+ public AMQMethodBody createRecover(boolean requeue)
+ {
+ return new BasicRecoverBodyImpl(requeue);
+ }
+
+ public AMQMethodBody createConsumer(AMQShortString tag, AMQShortString queueName, FieldTable arguments, boolean noAck, boolean exclusive, boolean noLocal, int ticket)
+ {
+ return new BasicConsumeBodyImpl(ticket,queueName,tag,noLocal,noAck,exclusive,false,arguments);
+ }
+
+ public AMQMethodBody createConsumerCancel(AMQShortString consumerTag)
+ {
+ return new BasicCancelBodyImpl(consumerTag, false);
+ }
+
+ public AMQMethodBody createAcknowledge(long deliveryTag, boolean multiple)
+ {
+ return new BasicAckBodyImpl(deliveryTag,multiple);
+ }
+
+ public AMQMethodBody createRejectBody(long deliveryTag, boolean requeue)
+ {
+ return new BasicRejectBodyImpl(deliveryTag, requeue);
+ }
+
+ public AMQMethodBody createMessageQos(int prefetchCount, int prefetchSize)
+ {
+ return new BasicQosBodyImpl(prefetchSize, prefetchCount, false);
+ }
+
+
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index 8126ca4bc8..17a2ec5d4e 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -136,6 +136,7 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
public void exceptionCaught(final NextFilter nextFilter, final IoSession session,
final Throwable cause) throws Exception
{
+ cause.printStackTrace();
nextFilter.exceptionCaught(session,cause);
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
index ab36041cb8..db76b6fe7e 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.protocol;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.AMQMethodBody;
/**
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
index 85bbe50b11..75ae6645e8 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.AMQMethodBody;
/**
@@ -32,7 +33,6 @@ public interface AMQMethodListener
/**
* Invoked when a method frame has been received
* @param evt the event that contains the method and channel
- * @param protocolSession the protocol session associated with the event
* @return true if the handler has processed the method frame, false otherwise. Note
* that this does not prohibit the method event being delivered to subsequent listeners
* but can be used to determine if nobody has dealt with an incoming method frame.
@@ -40,7 +40,7 @@ public interface AMQMethodListener
* to all registered listeners using the error() method (see below) allowing them to
* perform cleanup if necessary.
*/
- <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception;
+ <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException;
/**
* Callback when an error has occurred. Allows listeners to clean up.
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index b57c26e496..65f60e7f59 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.MethodRegistry;
public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware
{
- public VersionSpecificRegistry getRegistry();
+ public MethodRegistry getRegistry();
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
index 64db953bc2..c2c0bf29b7 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.protocol;
+import org.apache.qpid.framing.ProtocolVersion;
+
public interface ProtocolVersionAware
{
- public byte getProtocolMinorVersion();
+ public ProtocolVersion getProtocolVersion();
- public byte getProtocolMajorVersion();
}