summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/framing
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/framing')
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java247
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java45
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java46
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java21
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java19
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java33
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java19
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java29
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java25
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java35
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java50
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java106
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java52
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java503
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java32
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java197
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java312
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MethodRegistrySource.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java25
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java25
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java2
63 files changed, 1599 insertions, 857 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 291b7e8d29..0a0a570bc3 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -31,15 +31,6 @@ import org.apache.qpid.protocol.AMQConstant;
public class AMQDataBlockDecoder
{
- private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
-
- static
- {
- _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
- _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
- _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
- }
-
private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class);
private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
@@ -71,26 +62,13 @@ public class AMQDataBlockDecoder
}
- public AMQFrame createAndPopulateFrame(BodyFactory methodBodyFactory, MarkableDataInput in)
+ public <T> T createAndPopulateFrame(ProtocolVersion pv,
+ MethodProcessor<T> processor,
+ MarkableDataInput in)
throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
final byte type = in.readByte();
- BodyFactory bodyFactory;
- if (type == AMQMethodBody.TYPE)
- {
- bodyFactory = methodBodyFactory;
- }
- else
- {
- bodyFactory = _bodiesSupported[type];
- }
-
- if (bodyFactory == null)
- {
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type);
- }
-
final int channel = in.readUnsignedShort();
final long bodySize = EncodingUtils.readUnsignedInteger(in);
@@ -101,7 +79,24 @@ public class AMQDataBlockDecoder
+ " bodySize = " + bodySize);
}
- AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+ T result;
+ switch(type)
+ {
+ case 1:
+ result = processMethod(channel, in, processor, pv);
+ break;
+ case 2:
+ result = ContentHeaderBody.process(channel, in, processor, bodySize);
+ break;
+ case 3:
+ result = ContentBody.process(channel, in, processor, bodySize);
+ break;
+ case 8:
+ result = HeartbeatBody.process(channel, in, processor, bodySize);
+ break;
+ default:
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type);
+ }
byte marker = in.readByte();
if ((marker & 0xFF) != 0xCE)
@@ -110,11 +105,209 @@ public class AMQDataBlockDecoder
+ " type=" + type);
}
- return frame;
+ return result;
}
public void setMaxFrameSize(final int maxFrameSize)
{
_maxFrameSize = maxFrameSize;
}
+
+ private <T> T processMethod(int channelId, MarkableDataInput in, MethodProcessor<T> dispatcher, ProtocolVersion protocolVersion)
+ throws AMQFrameDecodingException, IOException
+ {
+ final int classAndMethod = in.readInt();
+
+ switch (classAndMethod)
+ {
+ //CONNECTION_CLASS:
+ case 0x000a000a:
+ return ConnectionStartBody.process(in, dispatcher);
+ case 0x000a000b:
+ return ConnectionStartOkBody.process(in, dispatcher);
+ case 0x000a0014:
+ return ConnectionSecureBody.process(in, dispatcher);
+ case 0x000a0015:
+ return ConnectionSecureOkBody.process(in, dispatcher);
+ case 0x000a001e:
+ return ConnectionTuneBody.process(in, dispatcher);
+ case 0x000a001f:
+ return ConnectionTuneOkBody.process(in, dispatcher);
+ case 0x000a0028:
+ return ConnectionOpenBody.process(in, dispatcher);
+ case 0x000a0029:
+ return ConnectionOpenOkBody.process(in, dispatcher);
+ case 0x000a002a:
+ return ConnectionRedirectBody.process(in, dispatcher);
+ case 0x000a0032:
+ if (protocolVersion.equals(ProtocolVersion.v8_0))
+ {
+ return ConnectionRedirectBody.process(in, dispatcher);
+ }
+ else
+ {
+ return ConnectionCloseBody.process(in, dispatcher);
+ }
+ case 0x000a0033:
+ if (protocolVersion.equals(ProtocolVersion.v8_0))
+ {
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ }
+ else
+ {
+ return dispatcher.connectionCloseOk();
+ }
+ case 0x000a003c:
+ if (protocolVersion.equals(ProtocolVersion.v8_0))
+ {
+ return ConnectionCloseBody.process(in, dispatcher);
+ }
+ else
+ {
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ }
+ case 0x000a003d:
+ if (protocolVersion.equals(ProtocolVersion.v8_0))
+ {
+ return dispatcher.connectionCloseOk();
+ }
+ else
+ {
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ }
+
+ // CHANNEL_CLASS:
+
+ case 0x0014000a:
+ return ChannelOpenBody.process(channelId, in, dispatcher);
+ case 0x0014000b:
+ return ChannelOpenOkBody.process(channelId, in, protocolVersion, dispatcher);
+ case 0x00140014:
+ return ChannelFlowBody.process(channelId, in, dispatcher);
+ case 0x00140015:
+ return ChannelFlowOkBody.process(channelId, in, dispatcher);
+ case 0x0014001e:
+ return ChannelAlertBody.process(channelId, in, dispatcher);
+ case 0x00140028:
+ return ChannelCloseBody.process(channelId, in, dispatcher);
+ case 0x00140029:
+ return dispatcher.channelCloseOk(channelId);
+
+ // ACCESS_CLASS:
+
+ case 0x001e000a:
+ return AccessRequestBody.process(channelId, in, dispatcher);
+ case 0x001e000b:
+ return AccessRequestOkBody.process(channelId, in, dispatcher);
+
+ // EXCHANGE_CLASS:
+
+ case 0x0028000a:
+ return ExchangeDeclareBody.process(channelId, in, dispatcher);
+ case 0x0028000b:
+ return dispatcher.exchangeDeclareOk(channelId);
+ case 0x00280014:
+ return ExchangeDeleteBody.process(channelId, in, dispatcher);
+ case 0x00280015:
+ return dispatcher.exchangeDeleteOk(channelId);
+ case 0x00280016:
+ return ExchangeBoundBody.process(channelId, in, dispatcher);
+ case 0x00280017:
+ return ExchangeBoundOkBody.process(channelId, in, dispatcher);
+
+
+ // QUEUE_CLASS:
+
+ case 0x0032000a:
+ return QueueDeclareBody.process(channelId, in, dispatcher);
+ case 0x0032000b:
+ return QueueDeclareOkBody.process(channelId, in, dispatcher);
+ case 0x00320014:
+ return QueueBindBody.process(channelId, in, dispatcher);
+ case 0x00320015:
+ return dispatcher.queueBindOk(channelId);
+ case 0x0032001e:
+ return QueuePurgeBody.process(channelId, in, dispatcher);
+ case 0x0032001f:
+ return QueuePurgeOkBody.process(channelId, in, dispatcher);
+ case 0x00320028:
+ return QueueDeleteBody.process(channelId, in, dispatcher);
+ case 0x00320029:
+ return QueueDeleteOkBody.process(channelId, in, dispatcher);
+ case 0x00320032:
+ return QueueUnbindBody.process(channelId, in, dispatcher);
+ case 0x00320033:
+ return dispatcher.queueUnbindOk(channelId);
+
+
+ // BASIC_CLASS:
+
+ case 0x003c000a:
+ return BasicQosBody.process(channelId, in, dispatcher);
+ case 0x003c000b:
+ return dispatcher.basicQosOk(channelId);
+ case 0x003c0014:
+ return BasicConsumeBody.process(channelId, in, dispatcher);
+ case 0x003c0015:
+ return BasicConsumeOkBody.process(channelId, in, dispatcher);
+ case 0x003c001e:
+ return BasicCancelBody.process(channelId, in, dispatcher);
+ case 0x003c001f:
+ return BasicCancelOkBody.process(channelId, in, dispatcher);
+ case 0x003c0028:
+ return BasicPublishBody.process(channelId, in, dispatcher);
+ case 0x003c0032:
+ return BasicReturnBody.process(channelId, in, dispatcher);
+ case 0x003c003c:
+ return BasicDeliverBody.process(channelId, in, dispatcher);
+ case 0x003c0046:
+ return BasicGetBody.process(channelId, in, dispatcher);
+ case 0x003c0047:
+ return BasicGetOkBody.process(channelId, in, dispatcher);
+ case 0x003c0048:
+ return BasicGetEmptyBody.process(channelId, in, dispatcher);
+ case 0x003c0050:
+ return BasicAckBody.process(channelId, in, dispatcher);
+ case 0x003c005a:
+ return BasicRejectBody.process(channelId, in, dispatcher);
+ case 0x003c0064:
+ return BasicRecoverBody.process(channelId, in, protocolVersion, dispatcher);
+ case 0x003c0065:
+ return dispatcher.basicRecoverSyncOk(channelId);
+ case 0x003c0066:
+ return BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ case 0x003c006e:
+ return BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ case 0x003c006f:
+ return dispatcher.basicRecoverSyncOk(channelId);
+
+ // TX_CLASS:
+
+ case 0x005a000a:
+ return dispatcher.txSelect(channelId);
+ case 0x005a000b:
+ return dispatcher.txSelectOk(channelId);
+ case 0x005a0014:
+ return dispatcher.txCommit(channelId);
+ case 0x005a0015:
+ return dispatcher.txCommitOk(channelId);
+ case 0x005a001e:
+ return dispatcher.txRollback(channelId);
+ case 0x005a001f:
+ return dispatcher.txRollbackOk(channelId);
+
+ default:
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+
+ }
+ }
+
+ private AMQFrameDecodingException newUnknownMethodException(final int classId, final int methodId, ProtocolVersion protocolVersion)
+ {
+ return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID,
+ "Method " + methodId + " unknown in AMQP version " + protocolVersion
+ + " (while trying to decode class " + classId + " method " + methodId + ".");
+ }
+
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index 238f28e73e..83397c37d8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.codec.MarkableDataInput;
-
import java.io.DataOutput;
import java.io.IOException;
@@ -39,12 +37,6 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
_bodyFrame = bodyFrame;
}
- public AMQFrame(final MarkableDataInput in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
- {
- this._channel = channel;
- this._bodyFrame = bodyFactory.createBody(in,bodySize);
- }
-
public long getSize()
{
return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
deleted file mode 100644
index fc49afb131..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.codec.MarkableDataInput;
-
-public class AMQMethodBodyFactory implements BodyFactory
-{
- private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
-
- private final MethodRegistrySource _registrySource;
-
- public AMQMethodBodyFactory(MethodRegistrySource registrySource)
- {
- _registrySource = registrySource;
- }
-
- public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
- {
- return _registrySource.getMethodRegistry().convertToBody(in, bodySize);
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
index d3e58c4444..e40452edea 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -28,7 +28,6 @@ import java.io.IOException;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -107,17 +106,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
}
- protected byte readByte(DataInput buffer) throws IOException
- {
- return buffer.readByte();
- }
-
- protected AMQShortString readAMQShortString(MarkableDataInput buffer) throws IOException
- {
- AMQShortString str = buffer.readAMQShortString();
- return str == null ? null : str.intern(false);
- }
-
protected int getSizeOf(AMQShortString string)
{
return EncodingUtils.encodedShortStringLength(string);
@@ -143,11 +131,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
buffer.writeInt(i);
}
- protected FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException
- {
- return EncodingUtils.readFieldTable(buffer);
- }
-
protected int getSizeOf(FieldTable table)
{
return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
@@ -158,11 +141,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
EncodingUtils.writeFieldTableBytes(buffer, table);
}
- protected long readLong(DataInput buffer) throws IOException
- {
- return buffer.readLong();
- }
-
protected void writeLong(DataOutput buffer, long l) throws IOException
{
buffer.writeLong(l);
@@ -178,11 +156,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
EncodingUtils.writeBytes(buffer,data);
}
- protected byte[] readBytes(DataInput buffer) throws IOException
- {
- return EncodingUtils.readBytes(buffer);
- }
-
protected short readShort(DataInput buffer) throws IOException
{
return EncodingUtils.readShort(buffer);
@@ -193,16 +166,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
EncodingUtils.writeShort(buffer, s);
}
- protected byte readBitfield(DataInput buffer) throws IOException
- {
- return readByte(buffer);
- }
-
- protected int readUnsignedShort(DataInput buffer) throws IOException
- {
- return buffer.readUnsignedShort();
- }
-
protected void writeBitfield(DataOutput buffer, byte bitfield0) throws IOException
{
buffer.writeByte(bitfield0);
@@ -213,21 +176,12 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
EncodingUtils.writeUnsignedShort(buffer, s);
}
- protected long readUnsignedInteger(DataInput buffer) throws IOException
- {
- return EncodingUtils.readUnsignedInteger(buffer);
- }
protected void writeUnsignedInteger(DataOutput buffer, long i) throws IOException
{
EncodingUtils.writeUnsignedInteger(buffer, i);
}
- protected short readUnsignedByte(DataInput buffer) throws IOException
- {
- return (short) buffer.readUnsignedByte();
- }
-
protected void writeUnsignedByte(DataOutput buffer, short unsignedByte) throws IOException
{
EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
index 695e41bc13..9a386d4eb4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
@@ -46,8 +46,8 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ
// Constructor
public AccessRequestBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _realm = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
+ _realm = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
}
public AccessRequestBody(
@@ -165,4 +165,17 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ AMQShortString realm = buffer.readAMQShortString();
+ byte bitfield = buffer.readByte();
+ boolean exclusive = (bitfield & 0x01) == 0x1 ;
+ boolean passive = (bitfield & 0x02) == 0x2 ;
+ boolean active = (bitfield & 0x04) == 0x4 ;
+ boolean write = (bitfield & 0x08) == 0x8 ;
+ boolean read = (bitfield & 0x10) == 0x10 ;
+ return dispatcher.accessRequest(channelId, realm, exclusive, passive, active, write, read);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
index c2bd5929fb..8439df9e92 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
@@ -45,7 +45,7 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA
// Constructor
public AccessRequestOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
+ _ticket = buffer.readUnsignedShort();
}
public AccessRequestOkBody(
@@ -95,4 +95,10 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ throws IOException
+ {
+ int ticket = buffer.readUnsignedShort();
+ return dispatcher.accessRequestOk(channelId, ticket);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
index b78e27996e..956d10bf0a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
@@ -46,8 +46,8 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB
// Constructor
public BasicAckBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _deliveryTag = readLong( buffer );
- _bitfield0 = readBitfield( buffer );
+ _deliveryTag = buffer.readLong();
+ _bitfield0 = buffer.readByte();
}
public BasicAckBody(
@@ -112,4 +112,13 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ long deliveryTag = buffer.readLong();
+ boolean multiple = (buffer.readByte() & 0x01) != 0;
+ return dispatcher.basicAck(channelId, deliveryTag, multiple);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
index 70783712c2..1c619fd5d4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
@@ -46,8 +46,8 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa
// Constructor
public BasicCancelBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _consumerTag = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
+ _consumerTag = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
}
public BasicCancelBody(
@@ -113,4 +113,13 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ AMQShortString consumerTag = buffer.readAMQShortString();
+ boolean noWait = (buffer.readByte() & 0x01) == 0x01;
+ return dispatcher.basicCancel(channelId, consumerTag, noWait);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
index 7cec873cc4..c85223cd4f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
@@ -45,7 +45,7 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ
// Constructor
public BasicCancelOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _consumerTag = readAMQShortString( buffer );
+ _consumerTag = buffer.readAMQShortString();
}
public BasicCancelOkBody(
@@ -96,4 +96,10 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ throws IOException
+ {
+ AMQShortString consumerTag = in.readAMQShortString();
+ return dispatcher.basicCancelOk(channelId, consumerTag);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
index f6a4c7e659..1d6ec46c9a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
@@ -49,11 +49,11 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD
// Constructor
public BasicConsumeBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _queue = readAMQShortString( buffer );
- _consumerTag = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
- _arguments = readFieldTable( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _queue = buffer.readAMQShortString();
+ _consumerTag = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
+ _arguments = EncodingUtils.readFieldTable(buffer);
}
public BasicConsumeBody(
@@ -191,4 +191,20 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ throws IOException, AMQFrameDecodingException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString queue = buffer.readAMQShortString();
+ AMQShortString consumerTag = buffer.readAMQShortString();
+ byte bitfield = buffer.readByte();
+
+ boolean noLocal = (bitfield & 0x01) == 0x01;
+ boolean noAck = (bitfield & 0x02) == 0x02;
+ boolean exclusive = (bitfield & 0x04) == 0x04;
+ boolean nowait = (bitfield & 0x08) == 0x08;
+ FieldTable arguments = EncodingUtils.readFieldTable(buffer);
+ return dispatcher.basicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
index b68de3a8de..b019574a6b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
@@ -45,7 +45,7 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM
// Constructor
public BasicConsumeOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _consumerTag = readAMQShortString( buffer );
+ _consumerTag = buffer.readAMQShortString();
}
public BasicConsumeOkBody(
@@ -96,4 +96,10 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ throws IOException
+ {
+ AMQShortString consumerTag = buffer.readAMQShortString();
+ return dispatcher.basicConsumeOk(channelId, consumerTag);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
index 987b8106d8..76cd9bfff4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
@@ -49,11 +49,11 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD
// Constructor
public BasicDeliverBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _consumerTag = readAMQShortString( buffer );
- _deliveryTag = readLong( buffer );
- _bitfield0 = readBitfield( buffer );
- _exchange = readAMQShortString( buffer );
- _routingKey = readAMQShortString( buffer );
+ _consumerTag = buffer.readAMQShortString();
+ _deliveryTag = buffer.readLong();
+ _bitfield0 = buffer.readByte();
+ _exchange = buffer.readAMQShortString();
+ _routingKey = buffer.readAMQShortString();
}
public BasicDeliverBody(
@@ -152,4 +152,16 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ AMQShortString consumerTag = buffer.readAMQShortString();
+ long deliveryTag = buffer.readLong();
+ boolean redelivered = (buffer.readByte() & 0x01) != 0;
+ AMQShortString exchange = buffer.readAMQShortString();
+ AMQShortString routingKey = buffer.readAMQShortString();
+ return dispatcher.basicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
index 8b6842463e..2ebde34648 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
@@ -47,9 +47,9 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB
// Constructor
public BasicGetBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _queue = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _queue = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
}
public BasicGetBody(
@@ -125,4 +125,13 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ throws IOException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString queue = buffer.readAMQShortString();
+ boolean noAck = (buffer.readByte() & 0x01) != 0;
+ return dispatcher.basicGet(channelId, queue, noAck);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
index c85876b260..508c3f8e66 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
@@ -45,7 +45,7 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ
// Constructor
public BasicGetEmptyBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _clusterId = readAMQShortString( buffer );
+ _clusterId = buffer.readAMQShortString();
}
public BasicGetEmptyBody(
@@ -89,11 +89,18 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ
public String toString()
{
- StringBuilder buf = new StringBuilder("[BasicGetEmptyBodyImpl: ");
+ StringBuilder buf = new StringBuilder("[BasicGetEmptyBody: ");
buf.append( "clusterId=" );
buf.append( getClusterId() );
buf.append("]");
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ AMQShortString clusterId = buffer.readAMQShortString();
+ return dispatcher.basicGetEmpty(channelId);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
index d3ba82b0d3..4020d8fb23 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
@@ -49,11 +49,11 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat
// Constructor
public BasicGetOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _deliveryTag = readLong( buffer );
- _bitfield0 = readBitfield( buffer );
- _exchange = readAMQShortString( buffer );
- _routingKey = readAMQShortString( buffer );
- _messageCount = readUnsignedInteger( buffer );
+ _deliveryTag = buffer.readLong();
+ _bitfield0 = buffer.readByte();
+ _exchange = buffer.readAMQShortString();
+ _routingKey = buffer.readAMQShortString();
+ _messageCount = EncodingUtils.readUnsignedInteger(buffer);
}
public BasicGetOkBody(
@@ -151,4 +151,15 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ long deliveryTag = buffer.readLong();
+ boolean redelivered = (buffer.readByte() & 0x01) != 0;
+ AMQShortString exchange = buffer.readAMQShortString();
+ AMQShortString routingKey = buffer.readAMQShortString();
+ long messageCount = EncodingUtils.readUnsignedInteger(buffer);
+ return dispatcher.basicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
index 7d24492395..7920da8405 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
@@ -48,10 +48,10 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD
// Constructor
public BasicPublishBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _exchange = readAMQShortString( buffer );
- _routingKey = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _exchange = buffer.readAMQShortString();
+ _routingKey = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
}
public BasicPublishBody(
@@ -151,4 +151,18 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString exchange = buffer.readAMQShortString();
+ AMQShortString routingKey = buffer.readAMQShortString();
+ byte bitfield = buffer.readByte();
+
+ boolean mandatory = (bitfield & 0x01) != 0;
+ boolean immediate = (bitfield & 0x02) != 0;
+ return dispatcher.basicPublish(channelId, exchange, routingKey, mandatory, immediate);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
index 76b30d1e3c..0843c5ccd7 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
@@ -47,9 +47,9 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB
// Constructor
public BasicQosBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _prefetchSize = readUnsignedInteger( buffer );
- _prefetchCount = readUnsignedShort( buffer );
- _bitfield0 = readBitfield( buffer );
+ _prefetchSize = EncodingUtils.readUnsignedInteger(buffer);
+ _prefetchCount = buffer.readUnsignedShort();
+ _bitfield0 = buffer.readByte();
}
public BasicQosBody(
@@ -124,4 +124,14 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ long prefetchSize = EncodingUtils.readUnsignedInteger(buffer);
+ int prefetchCount = buffer.readUnsignedShort();
+ boolean global = (buffer.readByte() & 0x01) == 0x01;
+ return dispatcher.basicQos(channelId, prefetchSize, prefetchCount, global);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
index fdda88534c..739470c658 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
@@ -45,7 +45,7 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD
// Constructor
public BasicRecoverBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _bitfield0 = readBitfield( buffer );
+ _bitfield0 = buffer.readByte();
}
public BasicRecoverBody(
@@ -100,4 +100,14 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput in,
+ final ProtocolVersion protocolVersion,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ boolean requeue = (in.readByte() & 0x01) == 0x01;
+ boolean sync = (ProtocolVersion.v8_0.equals(protocolVersion));
+
+ return dispatcher.basicRecover(channelId, requeue, sync);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
index 4fa98ac2dc..5826bd9d16 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
@@ -46,7 +46,7 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable
public BasicRecoverSyncBody(MarkableDataInput buffer, ProtocolVersion protocolVersion) throws AMQFrameDecodingException, IOException
{
_methodId = ProtocolVersion.v0_9.equals(protocolVersion) ? 102 : 110;
- _bitfield0 = readBitfield( buffer );
+ _bitfield0 = buffer.readByte();
}
public BasicRecoverSyncBody(ProtocolVersion protocolVersion,
@@ -103,4 +103,11 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput in,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ boolean requeue = (in.readByte() & 0x01) == 0x01;
+ return dispatcher.basicRecover(channelId, requeue, true);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java
index 65c15d8cd7..dc60d53952 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java
@@ -31,7 +31,6 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.MarkableDataInput;
public class BasicRecoverSyncOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
index 162fae9dda..83f2727a51 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
@@ -46,8 +46,8 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa
// Constructor
public BasicRejectBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _deliveryTag = readLong( buffer );
- _bitfield0 = readBitfield( buffer );
+ _deliveryTag = buffer.readLong();
+ _bitfield0 = buffer.readByte();
}
public BasicRejectBody(
@@ -112,4 +112,13 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ long deliveryTag = buffer.readLong();
+ boolean requeue = (buffer.readByte() & 0x01) != 0;
+ return dispatcher.basicReject(channelId, deliveryTag, requeue);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
index 073bbceb06..67d6c77312 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
@@ -48,10 +48,10 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa
// Constructor
public BasicReturnBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _replyCode = readUnsignedShort( buffer );
- _replyText = readAMQShortString( buffer );
- _exchange = readAMQShortString( buffer );
- _routingKey = readAMQShortString( buffer );
+ _replyCode = buffer.readUnsignedShort();
+ _replyText = buffer.readAMQShortString();
+ _exchange = buffer.readAMQShortString();
+ _routingKey = buffer.readAMQShortString();
}
public BasicReturnBody(
@@ -134,4 +134,15 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ int replyCode = buffer.readUnsignedShort();
+ AMQShortString replyText = buffer.readAMQShortString();
+ AMQShortString exchange = buffer.readAMQShortString();
+ AMQShortString routingKey = buffer.readAMQShortString();
+ return dispatcher.basicReturn(channelId, replyCode, replyText, exchange, routingKey);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
deleted file mode 100644
index 554e9373d8..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-import org.apache.qpid.codec.MarkableDataInput;
-
-import java.io.IOException;
-
-/**
- * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface.
- */
-public interface BodyFactory
-{
- AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException;
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
index 9f0e5ecdf5..d5c535b099 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
@@ -47,9 +47,9 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD
// Constructor
public ChannelAlertBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _replyCode = readUnsignedShort( buffer );
- _replyText = readAMQShortString( buffer );
- _details = readFieldTable( buffer );
+ _replyCode = buffer.readUnsignedShort();
+ _replyText = buffer.readAMQShortString();
+ _details = EncodingUtils.readFieldTable(buffer);
}
public ChannelAlertBody(
@@ -121,4 +121,13 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ throws IOException, AMQFrameDecodingException
+ {
+
+ int replyCode = buffer.readUnsignedShort();
+ AMQShortString replyText = buffer.readAMQShortString();
+ FieldTable details = EncodingUtils.readFieldTable(buffer);
+ return dispatcher.channelAlert(channelId, replyCode, replyText, details);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
index 7c969ba3bb..ea1536ed2b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
@@ -48,10 +48,10 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD
// Constructor
public ChannelCloseBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _replyCode = readUnsignedShort( buffer );
- _replyText = readAMQShortString( buffer );
- _classId = readUnsignedShort( buffer );
- _methodId = readUnsignedShort( buffer );
+ _replyCode = buffer.readUnsignedShort();
+ _replyText = buffer.readAMQShortString();
+ _classId = buffer.readUnsignedShort();
+ _methodId = buffer.readUnsignedShort();
}
public ChannelCloseBody(
@@ -132,4 +132,15 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ int replyCode = buffer.readUnsignedShort();
+ AMQShortString replyText = buffer.readAMQShortString();
+ int classId = buffer.readUnsignedShort();
+ int methodId = buffer.readUnsignedShort();
+ return dispatcher.channelClose(channelId, replyCode, replyText, classId, methodId);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
index d3d7dbc79b..e9b1572eef 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
@@ -35,6 +35,7 @@ import org.apache.qpid.codec.MarkableDataInput;
public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
+ public static final ChannelCloseOkBody INSTANCE = new ChannelCloseOkBody();
public static final int CLASS_ID = 20;
public static final int METHOD_ID = 41;
@@ -46,8 +47,7 @@ public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAM
{
}
- public ChannelCloseOkBody(
- )
+ private ChannelCloseOkBody()
{
}
@@ -64,8 +64,7 @@ public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAM
protected int getBodySize()
{
- int size = 0;
- return size;
+ return 0;
}
public void writeMethodPayload(DataOutput buffer) throws IOException
@@ -79,9 +78,8 @@ public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAM
public String toString()
{
- StringBuilder buf = new StringBuilder("[ChannelCloseOkBodyImpl: ");
- buf.append("]");
- return buf.toString();
+ return "[ChannelCloseOkBody]";
+
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
index 9973e45e5f..e70eb0ea35 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
@@ -40,24 +40,17 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa
public static final int METHOD_ID = 20;
// Fields declared in specification
- private final byte _bitfield0; // [active]
+ private final boolean _active; // [active]
// Constructor
public ChannelFlowBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _bitfield0 = readBitfield( buffer );
+ _active = (buffer.readByte() & 0x01) == 0x01;
}
- public ChannelFlowBody(
- boolean active
- )
+ public ChannelFlowBody(boolean active)
{
- byte bitfield0 = (byte)0;
- if( active )
- {
- bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
- }
- _bitfield0 = bitfield0;
+ _active = active;
}
public int getClazz()
@@ -72,18 +65,17 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa
public final boolean getActive()
{
- return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+ return _active;
}
protected int getBodySize()
{
- int size = 1;
- return size;
+ return 1;
}
public void writeMethodPayload(DataOutput buffer) throws IOException
{
- writeBitfield( buffer, _bitfield0 );
+ writeBitfield( buffer, _active ? (byte)1 : (byte)0);
}
public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
@@ -100,4 +92,11 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ boolean active = (buffer.readByte() & 0x01) == 0x01;
+ return dispatcher.channelFlow(channelId, active);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
index 20c793c3f4..13bdf332d2 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
@@ -40,24 +40,17 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ
public static final int METHOD_ID = 21;
// Fields declared in specification
- private final byte _bitfield0; // [active]
+ private final boolean _active; // [active]
// Constructor
public ChannelFlowOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _bitfield0 = readBitfield( buffer );
+ _active = (buffer.readByte() & 0x01) == 0x01;
}
- public ChannelFlowOkBody(
- boolean active
- )
+ public ChannelFlowOkBody(boolean active)
{
- byte bitfield0 = (byte)0;
- if( active )
- {
- bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
- }
- _bitfield0 = bitfield0;
+ _active = active;
}
public int getClazz()
@@ -72,7 +65,7 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ
public final boolean getActive()
{
- return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+ return _active;
}
protected int getBodySize()
@@ -83,7 +76,7 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ
public void writeMethodPayload(DataOutput buffer) throws IOException
{
- writeBitfield( buffer, _bitfield0 );
+ writeBitfield( buffer, _active ? (byte)1 : (byte)0 );
}
public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
@@ -100,4 +93,10 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ throws IOException
+ {
+ boolean active = (buffer.readByte() & 0x01) == 0x01;
+ return dispatcher.channelFlowOk(channelId, active);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
index a04856d49b..f96eb9344b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
@@ -39,20 +39,17 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa
public static final int CLASS_ID = 20;
public static final int METHOD_ID = 10;
- // Fields declared in specification
- private final AMQShortString _outOfBand; // [outOfBand]
// Constructor
public ChannelOpenBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _outOfBand = readAMQShortString( buffer );
+ // ignore unused OOB string
+ buffer.readAMQShortString();
}
- public ChannelOpenBody(
- AMQShortString outOfBand
- )
+ public ChannelOpenBody()
{
- _outOfBand = outOfBand;
+
}
public int getClazz()
@@ -65,21 +62,14 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa
return METHOD_ID;
}
- public final AMQShortString getOutOfBand()
- {
- return _outOfBand;
- }
-
protected int getBodySize()
{
- int size = 0;
- size += getSizeOf( _outOfBand );
- return size;
+ return 1;
}
public void writeMethodPayload(DataOutput buffer) throws IOException
{
- writeAMQShortString( buffer, _outOfBand );
+ writeAMQShortString( buffer, null );
}
public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
@@ -89,11 +79,14 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa
public String toString()
{
- StringBuilder buf = new StringBuilder("[ChannelOpenBodyImpl: ");
- buf.append( "outOfBand=" );
- buf.append( getOutOfBand() );
- buf.append("]");
- return buf.toString();
+ return "[ChannelOpenBody] ";
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ buffer.readAMQShortString();
+ return dispatcher.channelOpen(channelId);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
index a60e7f331b..5cf4d91970 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
@@ -96,4 +96,16 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return "[ChannelOpenOkBody]";
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput in,
+ final ProtocolVersion protocolVersion,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ if(!ProtocolVersion.v8_0.equals(protocolVersion))
+ {
+ EncodingUtils.readBytes(in);
+ }
+
+ return dispatcher.channelOpenOk(channelId);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
index 64388a42f3..02f214cee9 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
@@ -49,10 +49,10 @@ public class ConnectionCloseBody extends AMQMethodBodyImpl implements EncodableA
public ConnectionCloseBody(MarkableDataInput buffer, ProtocolVersion protocolVersion) throws AMQFrameDecodingException, IOException
{
_ownMethodId = ProtocolVersion.v8_0.equals(protocolVersion) ? 60 : 50;
- _replyCode = readUnsignedShort( buffer );
- _replyText = readAMQShortString( buffer );
- _classId = readUnsignedShort( buffer );
- _methodId = readUnsignedShort( buffer );
+ _replyCode = buffer.readUnsignedShort();
+ _replyText = buffer.readAMQShortString();
+ _classId = buffer.readUnsignedShort();
+ _methodId = buffer.readUnsignedShort();
}
public ConnectionCloseBody(ProtocolVersion protocolVersion,
@@ -134,4 +134,12 @@ public class ConnectionCloseBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ {
+ int replyCode = buffer.readUnsignedShort();
+ AMQShortString replyText = buffer.readAMQShortString();
+ int classId = buffer.readUnsignedShort();
+ int methodId = buffer.readUnsignedShort();
+ return dispatcher.connectionClose(replyCode, replyText, classId, methodId);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
index df20a1e0b3..f9f55446dd 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
@@ -42,14 +42,14 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM
// Fields declared in specification
private final AMQShortString _virtualHost; // [virtualHost]
private final AMQShortString _capabilities; // [capabilities]
- private final byte _bitfield0; // [insist]
+ private final boolean _insist; // [insist]
// Constructor
public ConnectionOpenBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _virtualHost = readAMQShortString( buffer );
- _capabilities = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
+ _virtualHost = buffer.readAMQShortString();
+ _capabilities = buffer.readAMQShortString();
+ _insist = (buffer.readByte() & 0x01) == 0x01;
}
public ConnectionOpenBody(
@@ -60,12 +60,7 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM
{
_virtualHost = virtualHost;
_capabilities = capabilities;
- byte bitfield0 = (byte)0;
- if( insist )
- {
- bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
- }
- _bitfield0 = bitfield0;
+ _insist = insist;
}
public int getClazz()
@@ -88,7 +83,7 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM
}
public final boolean getInsist()
{
- return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+ return _insist;
}
protected int getBodySize()
@@ -103,7 +98,7 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM
{
writeAMQShortString( buffer, _virtualHost );
writeAMQShortString( buffer, _capabilities );
- writeBitfield( buffer, _bitfield0 );
+ writeBitfield( buffer, _insist ? (byte)1 : (byte)0);
}
public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
@@ -126,4 +121,12 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ AMQShortString virtualHost = buffer.readAMQShortString();
+ AMQShortString capabilities = buffer.readAMQShortString();
+ boolean insist = (buffer.readByte() & 0x01) == 0x01;
+ return dispatcher.connectionOpen(virtualHost, capabilities, insist);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
index 68cb424f3b..3f04da7a29 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
@@ -45,7 +45,7 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable
// Constructor
public ConnectionOpenOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _knownHosts = readAMQShortString( buffer );
+ _knownHosts = buffer.readAMQShortString();
}
public ConnectionOpenOkBody(
@@ -96,4 +96,10 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ {
+ AMQShortString knownHosts = buffer.readAMQShortString();
+ return dispatcher.connectionOpenOk(knownHosts);
+
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
index 569e9d1cc1..80c655683f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
@@ -47,8 +47,8 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab
public ConnectionRedirectBody(MarkableDataInput buffer, ProtocolVersion protocolVersion) throws AMQFrameDecodingException, IOException
{
_ownMethodId = ProtocolVersion.v8_0.equals(protocolVersion) ? 50 : 42;
- _host = readAMQShortString( buffer );
- _knownHosts = readAMQShortString( buffer );
+ _host = buffer.readAMQShortString();
+ _knownHosts = buffer.readAMQShortString();
}
public ConnectionRedirectBody(ProtocolVersion protocolVersion, AMQShortString host, AMQShortString knownHosts)
@@ -108,4 +108,10 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ {
+ AMQShortString host = buffer.readAMQShortString();
+ AMQShortString knownHosts = buffer.readAMQShortString();
+ return dispatcher.connectionRedirect(host, knownHosts);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
index 18faeacf34..ca208d5a89 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
@@ -45,7 +45,7 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable
// Constructor
public ConnectionSecureBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _challenge = readBytes( buffer );
+ _challenge = EncodingUtils.readBytes(buffer);
}
public ConnectionSecureBody(
@@ -96,4 +96,11 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ throws IOException, AMQFrameDecodingException
+
+ {
+ byte[] challenge = EncodingUtils.readBytes(in);
+ return dispatcher.connectionSecure(challenge);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
index 1131567b06..0a2bfa613e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
@@ -45,7 +45,7 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab
// Constructor
public ConnectionSecureOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _response = readBytes( buffer );
+ _response = EncodingUtils.readBytes(buffer);
}
public ConnectionSecureOkBody(
@@ -96,4 +96,9 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) throws IOException
+ {
+ byte[] response = EncodingUtils.readBytes(in);
+ return dispatcher.connectionSecureOk(response);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
index a445dd953a..17a568d737 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
@@ -46,16 +46,6 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA
private final byte[] _mechanisms; // [mechanisms]
private final byte[] _locales; // [locales]
- // Constructor
- public ConnectionStartBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
- {
- _versionMajor = readUnsignedByte( buffer );
- _versionMinor = readUnsignedByte( buffer );
- _serverProperties = readFieldTable( buffer );
- _mechanisms = readBytes( buffer );
- _locales = readBytes( buffer );
- }
-
public ConnectionStartBody(
short versionMajor,
short versionMinor,
@@ -146,4 +136,16 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ throws IOException, AMQFrameDecodingException
+ {
+ short versionMajor = (short) in.readUnsignedByte();
+ short versionMinor = (short) in.readUnsignedByte();
+ FieldTable serverProperties = EncodingUtils.readFieldTable(in);
+ byte[] mechanisms = EncodingUtils.readBytes(in);
+ byte[] locales = EncodingUtils.readBytes(in);
+
+
+ return dispatcher.connectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
index e2284a7a5e..ba8182e569 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
@@ -45,15 +45,6 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl
private final byte[] _response; // [response]
private final AMQShortString _locale; // [locale]
- // Constructor
- public ConnectionStartOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
- {
- _clientProperties = readFieldTable( buffer );
- _mechanism = readAMQShortString( buffer );
- _response = readBytes( buffer );
- _locale = readAMQShortString( buffer );
- }
-
public ConnectionStartOkBody(
FieldTable clientProperties,
AMQShortString mechanism,
@@ -135,4 +126,15 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ throws IOException, AMQFrameDecodingException
+ {
+
+ FieldTable clientProperties = EncodingUtils.readFieldTable(in);
+ AMQShortString mechanism = in.readAMQShortString();
+ byte[] response = EncodingUtils.readBytes(in);
+ AMQShortString locale = in.readAMQShortString();
+
+ return dispatcher.connectionStartOk(clientProperties, mechanism, response, locale);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
index 2b6a67c4f6..2ca8e57e18 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
@@ -47,9 +47,9 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM
// Constructor
public ConnectionTuneBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _channelMax = readUnsignedShort( buffer );
- _frameMax = readUnsignedInteger( buffer );
- _heartbeat = readUnsignedShort( buffer );
+ _channelMax = buffer.readUnsignedShort();
+ _frameMax = EncodingUtils.readUnsignedInteger(buffer);
+ _heartbeat = buffer.readUnsignedShort();
}
public ConnectionTuneBody(
@@ -119,4 +119,12 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ int channelMax = buffer.readUnsignedShort();
+ long frameMax = EncodingUtils.readUnsignedInteger(buffer);
+ int heartbeat = buffer.readUnsignedShort();
+ return dispatcher.connectionTune(channelMax, frameMax, heartbeat);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
index 84ab6027e5..7a259b6419 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
@@ -47,9 +47,9 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable
// Constructor
public ConnectionTuneOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _channelMax = readUnsignedShort( buffer );
- _frameMax = readUnsignedInteger( buffer );
- _heartbeat = readUnsignedShort( buffer );
+ _channelMax = buffer.readUnsignedShort();
+ _frameMax = EncodingUtils.readUnsignedInteger(buffer);
+ _heartbeat = buffer.readUnsignedShort();
}
public ConnectionTuneOkBody(
@@ -119,4 +119,12 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
+ public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ int channelMax = buffer.readUnsignedShort();
+ long frameMax = EncodingUtils.readUnsignedInteger(buffer);
+ int heartbeat = buffer.readUnsignedShort();
+ return dispatcher.connectionTuneOk(channelMax, frameMax, heartbeat);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index 6d6ec708d0..dc345a6cc6 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -20,15 +20,16 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
public class ContentBody implements AMQBody
{
public static final byte TYPE = 3;
@@ -91,6 +92,16 @@ public class ContentBody implements AMQBody
return _payload;
}
+ public static <T> T process(final int channel,
+ final MarkableDataInput in,
+ final MethodProcessor<T> methodProcessor, final long bodySize)
+ throws IOException
+ {
+ byte[] payload = new byte[(int)bodySize];
+ in.readFully(payload);
+ return methodProcessor.messageContent(channel, payload);
+ }
+
private static class BufferContentBody implements AMQBody
{
private final int _length;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
deleted file mode 100644
index 10df105ee6..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.codec.MarkableDataInput;
-
-import java.io.IOException;
-
-public class ContentBodyFactory implements BodyFactory
-{
- private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
-
- private static final ContentBodyFactory _instance = new ContentBodyFactory();
-
- public static ContentBodyFactory getInstance()
- {
- return _instance;
- }
-
- private ContentBodyFactory()
- {
- _log.debug("Creating content body factory");
- }
-
- public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
- {
- return new ContentBody(in, bodySize);
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index f2a443d5fd..081b4bdfee 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -20,54 +20,45 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
public class ContentHeaderBody implements AMQBody
{
public static final byte TYPE = 2;
+ public static final int CLASS_ID = 60;
- private int classId;
-
- private int weight;
-
- private long bodySize;
+ private long _bodySize;
/** must never be null */
- private BasicContentHeaderProperties properties;
-
- public ContentHeaderBody()
- {
- }
+ private BasicContentHeaderProperties _properties;
public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
{
- classId = buffer.readUnsignedShort();
- weight = buffer.readUnsignedShort();
- bodySize = buffer.readLong();
+ buffer.readUnsignedShort();
+ buffer.readUnsignedShort();
+ _bodySize = buffer.readLong();
int propertyFlags = buffer.readUnsignedShort();
ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance();
- properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14);
+ _properties = factory.createContentHeaderProperties(CLASS_ID, propertyFlags, buffer, (int)size - 14);
}
-
- public ContentHeaderBody(BasicContentHeaderProperties props, int classId)
+ public ContentHeaderBody(BasicContentHeaderProperties props)
{
- properties = props;
- this.classId = classId;
+ _properties = props;
}
- public ContentHeaderBody(int classId, int weight, BasicContentHeaderProperties props, long bodySize)
+ public ContentHeaderBody(BasicContentHeaderProperties props, long bodySize)
{
- this(props, classId);
- this.weight = weight;
- this.bodySize = bodySize;
+ _properties = props;
+ _bodySize = bodySize;
}
public byte getFrameType()
@@ -95,16 +86,16 @@ public class ContentHeaderBody implements AMQBody
public int getSize()
{
- return 2 + 2 + 8 + 2 + properties.getPropertyListSize();
+ return 2 + 2 + 8 + 2 + _properties.getPropertyListSize();
}
public void writePayload(DataOutput buffer) throws IOException
{
- EncodingUtils.writeUnsignedShort(buffer, classId);
- EncodingUtils.writeUnsignedShort(buffer, weight);
- buffer.writeLong(bodySize);
- EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags());
- properties.writePropertyListPayload(buffer);
+ EncodingUtils.writeUnsignedShort(buffer, CLASS_ID);
+ EncodingUtils.writeUnsignedShort(buffer, 0);
+ buffer.writeLong(_bodySize);
+ EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags());
+ _properties.writePropertyListPayload(buffer);
}
public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
@@ -113,46 +104,42 @@ public class ContentHeaderBody implements AMQBody
session.contentHeaderReceived(channelId, this);
}
- public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
+ public static AMQFrame createAMQFrame(int channelId,
+ BasicContentHeaderProperties properties,
long bodySize)
{
- return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize));
- }
-
- public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body)
- {
- return new AMQFrame(channelId, body);
+ return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize));
}
public BasicContentHeaderProperties getProperties()
{
- return properties;
+ return _properties;
}
public void setProperties(BasicContentHeaderProperties props)
{
- properties = props;
+ _properties = props;
}
@Override
public String toString()
{
return "ContentHeaderBody{" +
- "classId=" + classId +
- ", weight=" + weight +
- ", bodySize=" + bodySize +
- ", properties=" + properties +
+ "classId=" + CLASS_ID +
+ ", weight=" + 0 +
+ ", bodySize=" + _bodySize +
+ ", properties=" + _properties +
'}';
}
public int getClassId()
{
- return classId;
+ return CLASS_ID;
}
public int getWeight()
{
- return weight;
+ return 0;
}
/** unsigned long but java can't handle that anyway when allocating byte array
@@ -160,11 +147,34 @@ public class ContentHeaderBody implements AMQBody
* @return the body size */
public long getBodySize()
{
- return bodySize;
+ return _bodySize;
}
public void setBodySize(long bodySize)
{
- this.bodySize = bodySize;
+ _bodySize = bodySize;
+ }
+
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> methodProcessor, final long size)
+ throws IOException, AMQFrameDecodingException
+ {
+
+ int classId = buffer.readUnsignedShort();
+ buffer.readUnsignedShort();
+ long bodySize = buffer.readLong();
+ int propertyFlags = buffer.readUnsignedShort();
+
+ BasicContentHeaderProperties properties;
+
+ if (classId != BasicConsumeBody.CLASS_ID)
+ {
+ throw new AMQFrameDecodingException(null, "Unsupported content header class id: " + classId, null);
+ }
+ properties = new BasicContentHeaderProperties();
+ properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14));
+
+ return methodProcessor.messageHeader(channelId, properties, bodySize);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
deleted file mode 100644
index 83a5211013..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.codec.MarkableDataInput;
-
-import java.io.IOException;
-
-public class ContentHeaderBodyFactory implements BodyFactory
-{
- private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
-
- private static final ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory();
-
- public static ContentHeaderBodyFactory getInstance()
- {
- return _instance;
- }
-
- private ContentHeaderBodyFactory()
- {
- _log.debug("Creating content header body factory");
- }
-
- public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
- {
- // all content headers are the same - it is only the properties that differ.
- // the content header body further delegates construction of properties
- return new ContentHeaderBody(in, bodySize);
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
index 028e3c83be..8244768fb6 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
@@ -47,9 +47,9 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ
// Constructor
public ExchangeBoundBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _exchange = readAMQShortString( buffer );
- _routingKey = readAMQShortString( buffer );
- _queue = readAMQShortString( buffer );
+ _exchange = buffer.readAMQShortString();
+ _routingKey = buffer.readAMQShortString();
+ _queue = buffer.readAMQShortString();
}
public ExchangeBoundBody(
@@ -122,4 +122,13 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ throws IOException
+ {
+
+ AMQShortString exchange = buffer.readAMQShortString();
+ AMQShortString routingKey = buffer.readAMQShortString();
+ AMQShortString queue = buffer.readAMQShortString();
+ return dispatcher.exchangeBound(channelId, exchange, routingKey, queue);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
index fa12cdf2cf..2d89a9f467 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
@@ -46,8 +46,8 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA
// Constructor
public ExchangeBoundOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _replyCode = readUnsignedShort( buffer );
- _replyText = readAMQShortString( buffer );
+ _replyCode = buffer.readUnsignedShort();
+ _replyText = buffer.readAMQShortString();
}
public ExchangeBoundOkBody(
@@ -108,4 +108,12 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ throws IOException
+ {
+
+ int replyCode = buffer.readUnsignedShort();
+ AMQShortString replyText = buffer.readAMQShortString();
+ return dispatcher.exchangeBoundOk(channelId, replyCode, replyText);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
index 23f428a381..f96e6d382e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
@@ -49,11 +49,11 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA
// Constructor
public ExchangeDeclareBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _exchange = readAMQShortString( buffer );
- _type = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
- _arguments = readFieldTable( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _exchange = buffer.readAMQShortString();
+ _type = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
+ _arguments = EncodingUtils.readFieldTable(buffer);
}
public ExchangeDeclareBody(
@@ -204,4 +204,21 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString exchange = buffer.readAMQShortString();
+ AMQShortString type = buffer.readAMQShortString();
+ byte bitfield = buffer.readByte();
+ boolean passive = (bitfield & 0x1) == 0x1;
+ boolean durable = (bitfield & 0x2) == 0x2;
+ boolean autoDelete = (bitfield & 0x4) == 0x4;
+ boolean internal = (bitfield & 0x8) == 0x8;
+ boolean nowait = (bitfield & 0x10) == 0x10;
+ FieldTable arguments = EncodingUtils.readFieldTable(buffer);
+ return dispatcher.exchangeDeclare(channelId, exchange, type, passive, durable, autoDelete, internal, nowait, arguments);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
index 98b0ba30f0..771fa63063 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
@@ -47,9 +47,9 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM
// Constructor
public ExchangeDeleteBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _exchange = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _exchange = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
}
public ExchangeDeleteBody(
@@ -138,4 +138,15 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
+ public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ throws IOException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString exchange = buffer.readAMQShortString();
+ byte bitfield = buffer.readByte();
+ boolean ifUnused = (bitfield & 0x01) == 0x01;
+ boolean nowait = (bitfield & 0x02) == 0x02;
+ return dispatcher.exchangeDelete(channelId, exchange, ifUnused, nowait);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
new file mode 100644
index 0000000000..c8b7d2639d
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
@@ -0,0 +1,503 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public class FrameCreatingMethodProcessor implements MethodProcessor<AMQFrame>
+{
+ private final MethodRegistry _methodRegistry;
+
+ FrameCreatingMethodProcessor(final MethodRegistry methodRegistry)
+ {
+ _methodRegistry = methodRegistry;
+ }
+
+ @Override
+ public AMQFrame connectionStart(final short versionMajor,
+ final short versionMinor,
+ final FieldTable serverProperties,
+ final byte[] mechanisms,
+ final byte[] locales)
+ {
+ return new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales));
+ }
+
+ @Override
+ public AMQFrame connectionStartOk(final FieldTable clientProperties,
+ final AMQShortString mechanism,
+ final byte[] response,
+ final AMQShortString locale)
+ {
+ return new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale));
+ }
+
+ @Override
+ public AMQFrame txSelect(final int channelId)
+ {
+ return new AMQFrame(channelId, TxSelectBody.INSTANCE);
+ }
+
+ @Override
+ public AMQFrame txSelectOk(final int channelId)
+ {
+ return new AMQFrame(channelId, TxSelectOkBody.INSTANCE);
+ }
+
+ @Override
+ public AMQFrame txCommit(final int channelId)
+ {
+ return new AMQFrame(channelId, TxCommitBody.INSTANCE);
+ }
+
+ @Override
+ public AMQFrame txCommitOk(final int channelId)
+ {
+ return new AMQFrame(channelId, TxCommitOkBody.INSTANCE);
+ }
+
+ @Override
+ public AMQFrame txRollback(final int channelId)
+ {
+ return new AMQFrame(channelId, TxRollbackBody.INSTANCE);
+ }
+
+ @Override
+ public AMQFrame txRollbackOk(final int channelId)
+ {
+ return new AMQFrame(channelId, TxRollbackOkBody.INSTANCE);
+ }
+
+ @Override
+ public AMQFrame connectionSecure(final byte[] challenge)
+ {
+ return new AMQFrame(0, new ConnectionSecureBody(challenge));
+ }
+
+ @Override
+ public AMQFrame connectionSecureOk(final byte[] response)
+ {
+ return new AMQFrame(0, new ConnectionSecureOkBody(response));
+ }
+
+ @Override
+ public AMQFrame connectionTune(final int channelMax, final long frameMax, final int heartbeat)
+ {
+ return new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat));
+ }
+
+ @Override
+ public AMQFrame connectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
+ {
+ return new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat));
+ }
+
+ @Override
+ public AMQFrame connectionOpen(final AMQShortString virtualHost,
+ final AMQShortString capabilities,
+ final boolean insist)
+ {
+ return new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist));
+ }
+
+ @Override
+ public AMQFrame connectionOpenOk(final AMQShortString knownHosts)
+ {
+ return new AMQFrame(0, new ConnectionOpenOkBody(knownHosts));
+ }
+
+ @Override
+ public AMQFrame connectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
+ {
+ return new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts));
+ }
+
+ @Override
+ public AMQFrame connectionClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
+ {
+ return new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId));
+ }
+
+ @Override
+ public AMQFrame connectionCloseOk()
+ {
+ return new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion())
+ ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8
+ : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9);
+ }
+
+ @Override
+ public AMQFrame channelOpen(final int channelId)
+ {
+ return new AMQFrame(channelId, new ChannelOpenBody());
+ }
+
+ @Override
+ public AMQFrame channelOpenOk(final int channelId)
+ {
+ return new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
+ ? ChannelOpenOkBody.INSTANCE_0_8
+ : ChannelOpenOkBody.INSTANCE_0_9);
+ }
+
+ @Override
+ public AMQFrame channelFlow(final int channelId, final boolean active)
+ {
+ return new AMQFrame(channelId, new ChannelFlowBody(active));
+ }
+
+ @Override
+ public AMQFrame channelFlowOk(final int channelId, final boolean active)
+ {
+ return new AMQFrame(channelId, new ChannelFlowOkBody(active));
+ }
+
+ @Override
+ public AMQFrame channelAlert(final int channelId,
+ final int replyCode,
+ final AMQShortString replyText,
+ final FieldTable details)
+ {
+ return new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details));
+ }
+
+ @Override
+ public AMQFrame channelClose(final int channelId,
+ final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
+ {
+ return new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId));
+ }
+
+ @Override
+ public AMQFrame channelCloseOk(final int channelId)
+ {
+ return new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE);
+ }
+
+ @Override
+ public AMQFrame accessRequest(final int channelId,
+ final AMQShortString realm,
+ final boolean exclusive,
+ final boolean passive,
+ final boolean active,
+ final boolean write,
+ final boolean read)
+ {
+ return new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read));
+ }
+
+ @Override
+ public AMQFrame accessRequestOk(final int channelId, final int ticket)
+ {
+ return new AMQFrame(channelId, new AccessRequestOkBody(ticket));
+ }
+
+ @Override
+ public AMQFrame exchangeDeclare(final int channelId,
+ final AMQShortString exchange,
+ final AMQShortString type,
+ final boolean passive,
+ final boolean durable,
+ final boolean autoDelete,
+ final boolean internal,
+ final boolean nowait, final FieldTable arguments)
+ {
+ return new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments));
+ }
+
+ @Override
+ public AMQFrame exchangeDeclareOk(final int channelId)
+ {
+ return new AMQFrame(channelId, new ExchangeDeclareOkBody());
+ }
+
+ @Override
+ public AMQFrame exchangeDelete(final int channelId,
+ final AMQShortString exchange,
+ final boolean ifUnused,
+ final boolean nowait)
+ {
+ return new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait));
+ }
+
+ @Override
+ public AMQFrame exchangeDeleteOk(final int channelId)
+ {
+ return new AMQFrame(channelId, new ExchangeDeleteOkBody());
+ }
+
+ @Override
+ public AMQFrame exchangeBound(final int channelId,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final AMQShortString queue)
+ {
+ return new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue));
+ }
+
+ @Override
+ public AMQFrame exchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
+ {
+ return new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText));
+ }
+
+ @Override
+ public AMQFrame queueBindOk(final int channelId)
+ {
+ return new AMQFrame(channelId, new QueueBindOkBody());
+ }
+
+ @Override
+ public AMQFrame queueUnbindOk(final int channelId)
+ {
+ return new AMQFrame(channelId, new QueueUnbindOkBody());
+ }
+
+ @Override
+ public AMQFrame queueDeclare(final int channelId,
+ final AMQShortString queue,
+ final boolean passive,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ return new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments));
+ }
+
+ @Override
+ public AMQFrame queueDeclareOk(final int channelId,
+ final AMQShortString queue,
+ final long messageCount,
+ final long consumerCount)
+ {
+ return new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount));
+ }
+
+ @Override
+ public AMQFrame queueBind(final int channelId,
+ final AMQShortString queue,
+ final AMQShortString exchange,
+ final AMQShortString bindingKey,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ return new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments));
+ }
+
+ @Override
+ public AMQFrame queuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
+ {
+ return new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait));
+ }
+
+ @Override
+ public AMQFrame queuePurgeOk(final int channelId, final long messageCount)
+ {
+ return new AMQFrame(channelId, new QueuePurgeOkBody(messageCount));
+ }
+
+ @Override
+ public AMQFrame queueDelete(final int channelId,
+ final AMQShortString queue,
+ final boolean ifUnused,
+ final boolean ifEmpty,
+ final boolean nowait)
+ {
+ return new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait));
+ }
+
+ @Override
+ public AMQFrame queueDeleteOk(final int channelId, final long messageCount)
+ {
+ return new AMQFrame(channelId, new QueueDeleteOkBody(messageCount));
+ }
+
+ @Override
+ public AMQFrame queueUnbind(final int channelId,
+ final AMQShortString queue,
+ final AMQShortString exchange,
+ final AMQShortString bindingKey,
+ final FieldTable arguments)
+ {
+ return new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments));
+ }
+
+ @Override
+ public AMQFrame basicRecoverSyncOk(final int channelId)
+ {
+ return new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()));
+ }
+
+ @Override
+ public AMQFrame basicRecover(final int channelId, final boolean requeue, final boolean sync)
+ {
+ if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
+ {
+ return new AMQFrame(channelId, new BasicRecoverBody(requeue));
+ }
+ else
+ {
+ return new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue));
+ }
+ }
+
+ @Override
+ public AMQFrame basicQos(final int channelId,
+ final long prefetchSize,
+ final int prefetchCount,
+ final boolean global)
+ {
+ return new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global));
+ }
+
+ @Override
+ public AMQFrame basicQosOk(final int channelId)
+ {
+ return new AMQFrame(channelId, new BasicQosOkBody());
+ }
+
+ @Override
+ public AMQFrame basicConsume(final int channelId,
+ final AMQShortString queue,
+ final AMQShortString consumerTag,
+ final boolean noLocal,
+ final boolean noAck,
+ final boolean exclusive,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ return new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments));
+ }
+
+ @Override
+ public AMQFrame basicConsumeOk(final int channelId, final AMQShortString consumerTag)
+ {
+ return new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag));
+ }
+
+ @Override
+ public AMQFrame basicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
+ {
+ return new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait));
+ }
+
+ @Override
+ public AMQFrame basicCancelOk(final int channelId, final AMQShortString consumerTag)
+ {
+ return new AMQFrame(channelId, new BasicCancelOkBody(consumerTag));
+ }
+
+ @Override
+ public AMQFrame basicPublish(final int channelId,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final boolean mandatory,
+ final boolean immediate)
+ {
+ return new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate));
+ }
+
+ @Override
+ public AMQFrame basicReturn(final int channelId, final int replyCode,
+ final AMQShortString replyText,
+ final AMQShortString exchange,
+ final AMQShortString routingKey)
+ {
+ return new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey));
+ }
+
+ @Override
+ public AMQFrame basicDeliver(final int channelId,
+ final AMQShortString consumerTag,
+ final long deliveryTag,
+ final boolean redelivered,
+ final AMQShortString exchange,
+ final AMQShortString routingKey)
+ {
+ return new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey));
+ }
+
+ @Override
+ public AMQFrame basicGet(final int channelId, final AMQShortString queue, final boolean noAck)
+ {
+ return new AMQFrame(channelId, new BasicGetBody(0, queue, noAck));
+ }
+
+ @Override
+ public AMQFrame basicGetOk(final int channelId,
+ final long deliveryTag,
+ final boolean redelivered,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final long messageCount)
+ {
+ return new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount));
+ }
+
+ @Override
+ public AMQFrame basicGetEmpty(final int channelId)
+ {
+ return new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null));
+ }
+
+ @Override
+ public AMQFrame basicAck(final int channelId, final long deliveryTag, final boolean multiple)
+ {
+ return new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple));
+ }
+
+ @Override
+ public AMQFrame basicReject(final int channelId, final long deliveryTag, final boolean requeue)
+ {
+ return new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue));
+ }
+
+ @Override
+ public AMQFrame heartbeat()
+ {
+ return new AMQFrame(0, new HeartbeatBody());
+ }
+
+ private ProtocolVersion getProtocolVersion()
+ {
+ return _methodRegistry.getProtocolVersion();
+ }
+
+ @Override
+ public AMQFrame messageContent(final int channelId, final byte[] data)
+ {
+ return new AMQFrame(channelId, new ContentBody(data));
+ }
+
+ @Override
+ public AMQFrame messageHeader(final int channelId,
+ final BasicContentHeaderProperties properties,
+ final long bodySize)
+ {
+ return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize));
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index 1613cd055e..23f71c62db 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
public class HeartbeatBody implements AMQBody
{
public static final byte TYPE = 8;
@@ -79,4 +80,17 @@ public class HeartbeatBody implements AMQBody
{
return new AMQFrame(0, this);
}
+
+ public static <T> T process(final int channel,
+ final MarkableDataInput in,
+ final MethodProcessor<T> processor,
+ final long bodySize) throws IOException
+ {
+
+ if(bodySize > 0)
+ {
+ in.skip(bodySize);
+ }
+ return processor.heartbeat();
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
deleted file mode 100644
index 971caca41a..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-import org.apache.qpid.codec.MarkableDataInput;
-
-public class HeartbeatBodyFactory implements BodyFactory
-{
- public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException
- {
- return new HeartbeatBody();
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
new file mode 100644
index 0000000000..ecedacaba4
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public interface MethodProcessor<T>
+{
+ T connectionStart(short versionMajor,
+ short versionMinor,
+ FieldTable serverProperties,
+ byte[] mechanisms,
+ byte[] locales);
+
+ T connectionStartOk(FieldTable clientProperties,
+ AMQShortString mechanism,
+ byte[] response,
+ AMQShortString locale);
+
+ T txSelect(int channelId);
+
+ T txSelectOk(int channelId);
+
+ T txCommit(int channelId);
+
+ T txCommitOk(int channelId);
+
+ T txRollback(int channelId);
+
+ T txRollbackOk(int channelId);
+
+ T connectionSecure(byte[] challenge);
+
+ T connectionSecureOk(byte[] response);
+
+ T connectionTune(int channelMax, long frameMax, int heartbeat);
+
+ T connectionTuneOk(int channelMax, long frameMax, int heartbeat);
+
+ T connectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
+
+ T connectionOpenOk(AMQShortString knownHosts);
+
+ T connectionRedirect(AMQShortString host, AMQShortString knownHosts);
+
+ T connectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
+
+ T connectionCloseOk();
+
+ T channelOpen(int channelId);
+
+ T channelOpenOk(int channelId);
+
+ T channelFlow(int channelId, boolean active);
+
+ T channelFlowOk(int channelId, boolean active);
+
+ T channelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details);
+
+ T channelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId);
+
+ T channelCloseOk(int channelId);
+
+ T accessRequest(int channelId,
+ AMQShortString realm,
+ boolean exclusive,
+ boolean passive,
+ boolean active,
+ boolean write, boolean read);
+
+ T accessRequestOk(int channelId, int ticket);
+
+ T exchangeDeclare(int channelId,
+ AMQShortString exchange,
+ AMQShortString type,
+ boolean passive,
+ boolean durable,
+ boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
+
+ T exchangeDeclareOk(int channelId);
+
+ T exchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait);
+
+ T exchangeDeleteOk(int channelId);
+
+ T exchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
+
+ T exchangeBoundOk(int channelId, int replyCode, AMQShortString replyText);
+
+ T queueBindOk(int channelId);
+
+ T queueUnbindOk(final int channelId);
+
+ T queueDeclare(int channelId,
+ AMQShortString queue,
+ boolean passive,
+ boolean durable,
+ boolean exclusive,
+ boolean autoDelete, boolean nowait, FieldTable arguments);
+
+ T queueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount);
+
+ T queueBind(int channelId,
+ AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString bindingKey,
+ boolean nowait, FieldTable arguments);
+
+ T queuePurge(int channelId, AMQShortString queue, boolean nowait);
+
+ T queuePurgeOk(int channelId, long messageCount);
+
+ T queueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
+
+ T queueDeleteOk(int channelId, long messageCount);
+
+ T queueUnbind(int channelId,
+ AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString bindingKey,
+ FieldTable arguments);
+
+ T basicRecoverSyncOk(int channelId);
+
+ T basicRecover(int channelId, final boolean requeue, boolean sync);
+
+ T basicQos(int channelId, long prefetchSize, int prefetchCount, boolean global);
+
+ T basicQosOk(int channelId);
+
+ T basicConsume(int channelId,
+ AMQShortString queue,
+ AMQShortString consumerTag,
+ boolean noLocal,
+ boolean noAck,
+ boolean exclusive, boolean nowait, FieldTable arguments);
+
+ T basicConsumeOk(int channelId, AMQShortString consumerTag);
+
+ T basicCancel(int channelId, AMQShortString consumerTag, boolean noWait);
+
+ T basicCancelOk(int channelId, AMQShortString consumerTag);
+
+ T basicPublish(int channelId,
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ boolean mandatory,
+ boolean immediate);
+
+ T basicReturn(final int channelId,
+ int replyCode,
+ AMQShortString replyText,
+ AMQShortString exchange,
+ AMQShortString routingKey);
+
+ T basicDeliver(int channelId,
+ AMQShortString consumerTag,
+ long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange, AMQShortString routingKey);
+
+ T basicGet(int channelId, AMQShortString queue, boolean noAck);
+
+ T basicGetOk(int channelId,
+ long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey, long messageCount);
+
+ T basicGetEmpty(int channelId);
+
+ T basicAck(int channelId, long deliveryTag, boolean multiple);
+
+ T basicReject(int channelId, long deliveryTag, boolean requeue);
+
+ T heartbeat();
+
+ T messageContent(int channelId, byte[] data);
+
+ T messageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java
index 0fad853194..c4fd131d0e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java
@@ -29,302 +29,23 @@
package org.apache.qpid.framing;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.qpid.codec.MarkableDataInput;
-import org.apache.qpid.protocol.AMQConstant;
-
-
public final class MethodRegistry
{
- private static final Map<ProtocolVersion, MethodRegistry> _registries = new HashMap<>();
-
-
- public static final MethodRegistry registry_0_9 =
- new MethodRegistry(ProtocolVersion.v0_9);
-
- public static final MethodRegistry registry_0_91 =
- new MethodRegistry(ProtocolVersion.v0_91);
-
- public static final MethodRegistry registry_8_0 =
- new MethodRegistry(ProtocolVersion.v8_0);
-
- private final ProtocolVersion _protocolVersion;
-
- public final AMQMethodBody convertToBody(MarkableDataInput in, long size)
- throws AMQFrameDecodingException, IOException
- {
- final int classAndMethod = in.readInt();
-
- AMQMethodBody methodBody;
- switch(classAndMethod)
- {
- //CONNECTION_CLASS:
- case 0x000a000a:
- methodBody = new ConnectionStartBody(in);
- break;
- case 0x000a000b:
- methodBody = new ConnectionStartOkBody(in);
- break;
- case 0x000a0014:
- methodBody = new ConnectionSecureBody(in);
- break;
- case 0x000a0015:
- methodBody = new ConnectionSecureOkBody(in);
- break;
- case 0x000a001e:
- methodBody = new ConnectionTuneBody(in);
- break;
- case 0x000a001f:
- methodBody = new ConnectionTuneOkBody(in);
- break;
- case 0x000a0028:
- methodBody = new ConnectionOpenBody(in);
- break;
- case 0x000a0029:
- methodBody = new ConnectionOpenOkBody(in);
- break;
- case 0x000a002a:
- methodBody = new ConnectionRedirectBody(in, _protocolVersion);
- break;
- case 0x000a0032:
- if(_protocolVersion.equals(ProtocolVersion.v8_0))
- {
- methodBody = new ConnectionRedirectBody(in, _protocolVersion);
- }
- else
- {
- methodBody = new ConnectionCloseBody(in, _protocolVersion);
- }
- break;
- case 0x000a0033:
- if(_protocolVersion.equals(ProtocolVersion.v8_0))
- {
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF));
- }
- else
- {
- methodBody = ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9;
- }
- break;
- case 0x000a003c:
- if(_protocolVersion.equals(ProtocolVersion.v8_0))
- {
- methodBody = new ConnectionCloseBody(in, _protocolVersion);
- }
- else
- {
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF));
- }
- break;
- case 0x000a003d:
- if(_protocolVersion.equals(ProtocolVersion.v8_0))
- {
- methodBody = ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8;
- }
- else
- {
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF));
- }
- break;
-
- // CHANNEL_CLASS:
-
- case 0x0014000a:
- methodBody = new ChannelOpenBody(in);
- break;
- case 0x0014000b:
- methodBody = ChannelOpenOkBody.getInstance(_protocolVersion, in);
- break;
- case 0x00140014:
- methodBody = new ChannelFlowBody(in);
- break;
- case 0x00140015:
- methodBody = new ChannelFlowOkBody(in);
- break;
- case 0x0014001e:
- methodBody = new ChannelAlertBody(in);
- break;
- case 0x00140028:
- methodBody = new ChannelCloseBody(in);
- break;
- case 0x00140029:
- methodBody = new ChannelCloseOkBody(in);
- break;
- // ACCESS_CLASS:
-
- case 0x001e000a:
- methodBody = new AccessRequestBody(in);
- break;
- case 0x001e000b:
- methodBody = new AccessRequestOkBody(in);
- break;
-
- // EXCHANGE_CLASS:
-
- case 0x0028000a:
- methodBody = new ExchangeDeclareBody(in);
- break;
- case 0x0028000b:
- methodBody = new ExchangeDeclareOkBody(in);
- break;
- case 0x00280014:
- methodBody = new ExchangeDeleteBody(in);
- break;
- case 0x00280015:
- methodBody = new ExchangeDeleteOkBody(in);
- break;
- case 0x00280016:
- methodBody = new ExchangeBoundBody(in);
- break;
- case 0x00280017:
- methodBody = new ExchangeBoundOkBody(in);
- break;
-
-
- // QUEUE_CLASS:
-
- case 0x0032000a:
- methodBody = new QueueDeclareBody(in);
- break;
- case 0x0032000b:
- methodBody = new QueueDeclareOkBody(in);
- break;
- case 0x00320014:
- methodBody = new QueueBindBody(in);
- break;
- case 0x00320015:
- methodBody = new QueueBindOkBody(in);
- break;
- case 0x0032001e:
- methodBody = new QueuePurgeBody(in);
- break;
- case 0x0032001f:
- methodBody = new QueuePurgeOkBody(in);
- break;
- case 0x00320028:
- methodBody = new QueueDeleteBody(in);
- break;
- case 0x00320029:
- methodBody = new QueueDeleteOkBody(in);
- break;
- case 0x00320032:
- methodBody = new QueueUnbindBody(in);
- break;
- case 0x00320033:
- methodBody = new QueueUnbindOkBody(in);
- break;
-
-
- // BASIC_CLASS:
-
- case 0x003c000a:
- methodBody = new BasicQosBody(in);
- break;
- case 0x003c000b:
- methodBody = new BasicQosOkBody(in);
- break;
- case 0x003c0014:
- methodBody = new BasicConsumeBody(in);
- break;
- case 0x003c0015:
- methodBody = new BasicConsumeOkBody(in);
- break;
- case 0x003c001e:
- methodBody = new BasicCancelBody(in);
- break;
- case 0x003c001f:
- methodBody = new BasicCancelOkBody(in);
- break;
- case 0x003c0028:
- methodBody = new BasicPublishBody(in);
- break;
- case 0x003c0032:
- methodBody = new BasicReturnBody(in);
- break;
- case 0x003c003c:
- methodBody = new BasicDeliverBody(in);
- break;
- case 0x003c0046:
- methodBody = new BasicGetBody(in);
- break;
- case 0x003c0047:
- methodBody = new BasicGetOkBody(in);
- break;
- case 0x003c0048:
- methodBody = new BasicGetEmptyBody(in);
- break;
- case 0x003c0050:
- methodBody = new BasicAckBody(in);
- break;
- case 0x003c005a:
- methodBody = new BasicRejectBody(in);
- break;
- case 0x003c0064:
- methodBody = new BasicRecoverBody(in);
- break;
- case 0x003c0065:
- methodBody = new BasicRecoverSyncOkBody(_protocolVersion);
- break;
- case 0x003c0066:
- methodBody = new BasicRecoverSyncBody(in, _protocolVersion);
- break;
- case 0x003c006e:
- methodBody = new BasicRecoverSyncBody(in, _protocolVersion);
- break;
- case 0x003c006f:
- methodBody = new BasicRecoverSyncOkBody(_protocolVersion);
- break;
-
- // TX_CLASS:
-
- case 0x005a000a:
- methodBody = TxSelectBody.INSTANCE;
- break;
- case 0x005a000b:
- methodBody = TxSelectOkBody.INSTANCE;
- break;
- case 0x005a0014:
- methodBody = TxCommitBody.INSTANCE;
- break;
- case 0x005a0015:
- methodBody = TxCommitOkBody.INSTANCE;
- break;
- case 0x005a001e:
- methodBody = TxRollbackBody.INSTANCE;
- break;
- case 0x005a001f:
- methodBody = TxRollbackOkBody.INSTANCE;
- break;
-
- default:
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF));
-
- }
- return methodBody;
- }
-
- private AMQFrameDecodingException newUnknownMethodException(final int classId, final int methodId)
- {
- return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID,
- "Method " + methodId + " unknown in AMQP version " + _protocolVersion
- + " (while trying to decode class " + classId + " method " + methodId + ".");
- }
-
- private MethodRegistry(ProtocolVersion pv)
- {
- _registries.put(pv, this);
+ private final FrameCreatingMethodProcessor _methodProcessor;
+ private ProtocolVersion _protocolVersion;
+
+
+ public MethodRegistry(ProtocolVersion pv)
+ {
_protocolVersion = pv;
+ _methodProcessor = new FrameCreatingMethodProcessor(this);
}
- public static MethodRegistry getMethodRegistry(ProtocolVersion pv)
+ public void setProtocolVersion(final ProtocolVersion protocolVersion)
{
- return _registries.get(pv);
+ _protocolVersion = protocolVersion;
}
-
public final AccessRequestBody createAccessRequestBody(final AMQShortString realm,
final boolean exclusive,
final boolean passive,
@@ -502,7 +223,7 @@ public final class MethodRegistry
public final ChannelOpenBody createChannelOpenBody(final AMQShortString outOfBand)
{
- return new ChannelOpenBody(outOfBand);
+ return new ChannelOpenBody();
}
public final ChannelOpenOkBody createChannelOpenOkBody(byte[] channelId)
@@ -540,7 +261,7 @@ public final class MethodRegistry
public final ChannelCloseOkBody createChannelCloseOkBody()
{
- return new ChannelCloseOkBody();
+ return ChannelCloseOkBody.INSTANCE;
}
@@ -829,4 +550,15 @@ public final class MethodRegistry
return TxRollbackOkBody.INSTANCE;
}
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
+
+ public FrameCreatingMethodProcessor getMethodProcessor()
+ {
+ return _methodProcessor;
+ }
+
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistrySource.java b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistrySource.java
deleted file mode 100644
index 4d2eda68b2..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistrySource.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-public interface MethodRegistrySource
-{
- public MethodRegistry getMethodRegistry();
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
index ddcaf6290e..42e1c44d7d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
@@ -50,12 +50,12 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData
// Constructor
public QueueBindBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _queue = readAMQShortString( buffer );
- _exchange = readAMQShortString( buffer );
- _routingKey = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
- _arguments = readFieldTable( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _queue = buffer.readAMQShortString();
+ _exchange = buffer.readAMQShortString();
+ _routingKey = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
+ _arguments = EncodingUtils.readFieldTable(buffer);
}
public QueueBindBody(
@@ -165,4 +165,17 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString queue = buffer.readAMQShortString();
+ AMQShortString exchange = buffer.readAMQShortString();
+ AMQShortString bindingKey = buffer.readAMQShortString();
+ boolean nowait = (buffer.readByte() & 0x01) == 0x01;
+ FieldTable arguments = EncodingUtils.readFieldTable(buffer);
+ return dispatcher.queueBind(channelId, queue, exchange, bindingKey, nowait, arguments);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
index c7322aa71c..3a8d2f41a5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
@@ -48,10 +48,10 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD
// Constructor
public QueueDeclareBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _queue = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
- _arguments = readFieldTable( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _queue = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
+ _arguments = EncodingUtils.readFieldTable(buffer);
}
public QueueDeclareBody(
@@ -191,4 +191,21 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString queue = buffer.readAMQShortString();
+ byte bitfield = buffer.readByte();
+
+ boolean passive = (bitfield & 0x01 ) == 0x01;
+ boolean durable = (bitfield & 0x02 ) == 0x02;
+ boolean exclusive = (bitfield & 0x04 ) == 0x04;
+ boolean autoDelete = (bitfield & 0x08 ) == 0x08;
+ boolean nowait = (bitfield & 0x010 ) == 0x010;
+ FieldTable arguments = EncodingUtils.readFieldTable(buffer);
+ return dispatcher.queueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
index 7ee65e377a..47deb9cd6d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
@@ -47,9 +47,9 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM
// Constructor
public QueueDeclareOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _queue = readAMQShortString( buffer );
- _messageCount = readUnsignedInteger( buffer );
- _consumerCount = readUnsignedInteger( buffer );
+ _queue = buffer.readAMQShortString();
+ _messageCount = EncodingUtils.readUnsignedInteger(buffer);
+ _consumerCount = EncodingUtils.readUnsignedInteger(buffer);
}
public QueueDeclareOkBody(
@@ -120,4 +120,13 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ AMQShortString queue = buffer.readAMQShortString();
+ long messageCount = EncodingUtils.readUnsignedInteger(buffer);
+ long consumerCount = EncodingUtils.readUnsignedInteger(buffer);
+ return dispatcher.queueDeclareOk(channelId, queue, messageCount, consumerCount);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
index 6e534fd556..fc9795f48b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
@@ -47,9 +47,9 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa
// Constructor
public QueueDeleteBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _queue = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _queue = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
}
public QueueDeleteBody(
@@ -151,4 +151,18 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString queue = buffer.readAMQShortString();
+ byte bitfield = buffer.readByte();
+
+ boolean ifUnused = (bitfield & 0x01) == 0x01;
+ boolean ifEmpty = (bitfield & 0x02) == 0x02;
+ boolean nowait = (bitfield & 0x04) == 0x04;
+ return dispatcher.queueDelete(channelId, queue, ifUnused, ifEmpty, nowait);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
index cb58db5de6..b04f844084 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
@@ -45,7 +45,7 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ
// Constructor
public QueueDeleteOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _messageCount = readUnsignedInteger( buffer );
+ _messageCount = EncodingUtils.readUnsignedInteger(buffer);
}
public QueueDeleteOkBody(
@@ -95,4 +95,11 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ long messageCount = EncodingUtils.readUnsignedInteger(buffer);
+ return dispatcher.queueDeleteOk(channelId, messageCount);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
index 713b0b24ad..d2f41922cc 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
@@ -47,9 +47,9 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat
// Constructor
public QueuePurgeBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _queue = readAMQShortString( buffer );
- _bitfield0 = readBitfield( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _queue = buffer.readAMQShortString();
+ _bitfield0 = buffer.readByte();
}
public QueuePurgeBody(
@@ -125,4 +125,14 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString queue = buffer.readAMQShortString();
+ boolean nowait = (buffer.readByte() & 0x01) == 0x01;
+ return dispatcher.queuePurge(channelId, queue, nowait);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
index c2bc1caf14..da5ba766ae 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
@@ -45,7 +45,7 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD
// Constructor
public QueuePurgeOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _messageCount = readUnsignedInteger( buffer );
+ _messageCount = EncodingUtils.readUnsignedInteger(buffer);
}
public QueuePurgeOkBody(
@@ -95,4 +95,11 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException
+ {
+ long messageCount = EncodingUtils.readUnsignedInteger(buffer);
+ return dispatcher.queuePurgeOk(channelId, messageCount);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
index a5aba58f15..968cc02212 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
@@ -49,11 +49,11 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa
// Constructor
public QueueUnbindBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
{
- _ticket = readUnsignedShort( buffer );
- _queue = readAMQShortString( buffer );
- _exchange = readAMQShortString( buffer );
- _routingKey = readAMQShortString( buffer );
- _arguments = readFieldTable( buffer );
+ _ticket = buffer.readUnsignedShort();
+ _queue = buffer.readAMQShortString();
+ _exchange = buffer.readAMQShortString();
+ _routingKey = buffer.readAMQShortString();
+ _arguments = EncodingUtils.readFieldTable(buffer);
}
public QueueUnbindBody(
@@ -147,4 +147,16 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
+ public static <T> T process(final int channelId,
+ final MarkableDataInput buffer,
+ final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ {
+
+ int ticket = buffer.readUnsignedShort();
+ AMQShortString queue = buffer.readAMQShortString();
+ AMQShortString exchange = buffer.readAMQShortString();
+ AMQShortString routingKey = buffer.readAMQShortString();
+ FieldTable arguments = EncodingUtils.readFieldTable(buffer);
+ return dispatcher.queueUnbind(channelId, queue, exchange, routingKey, arguments);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
index 27c49a0804..2e504d6fc7 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
@@ -27,11 +27,11 @@
package org.apache.qpid.framing;
-import org.apache.qpid.codec.MarkableDataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
public class QueueUnbindOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{