diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/framing')
63 files changed, 1599 insertions, 857 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 291b7e8d29..0a0a570bc3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -31,15 +31,6 @@ import org.apache.qpid.protocol.AMQConstant; public class AMQDataBlockDecoder { - private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; - - static - { - _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); - _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); - _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); - } - private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); @@ -71,26 +62,13 @@ public class AMQDataBlockDecoder } - public AMQFrame createAndPopulateFrame(BodyFactory methodBodyFactory, MarkableDataInput in) + public <T> T createAndPopulateFrame(ProtocolVersion pv, + MethodProcessor<T> processor, + MarkableDataInput in) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { final byte type = in.readByte(); - BodyFactory bodyFactory; - if (type == AMQMethodBody.TYPE) - { - bodyFactory = methodBodyFactory; - } - else - { - bodyFactory = _bodiesSupported[type]; - } - - if (bodyFactory == null) - { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); - } - final int channel = in.readUnsignedShort(); final long bodySize = EncodingUtils.readUnsignedInteger(in); @@ -101,7 +79,24 @@ public class AMQDataBlockDecoder + " bodySize = " + bodySize); } - AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); + T result; + switch(type) + { + case 1: + result = processMethod(channel, in, processor, pv); + break; + case 2: + result = ContentHeaderBody.process(channel, in, processor, bodySize); + break; + case 3: + result = ContentBody.process(channel, in, processor, bodySize); + break; + case 8: + result = HeartbeatBody.process(channel, in, processor, bodySize); + break; + default: + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); + } byte marker = in.readByte(); if ((marker & 0xFF) != 0xCE) @@ -110,11 +105,209 @@ public class AMQDataBlockDecoder + " type=" + type); } - return frame; + return result; } public void setMaxFrameSize(final int maxFrameSize) { _maxFrameSize = maxFrameSize; } + + private <T> T processMethod(int channelId, MarkableDataInput in, MethodProcessor<T> dispatcher, ProtocolVersion protocolVersion) + throws AMQFrameDecodingException, IOException + { + final int classAndMethod = in.readInt(); + + switch (classAndMethod) + { + //CONNECTION_CLASS: + case 0x000a000a: + return ConnectionStartBody.process(in, dispatcher); + case 0x000a000b: + return ConnectionStartOkBody.process(in, dispatcher); + case 0x000a0014: + return ConnectionSecureBody.process(in, dispatcher); + case 0x000a0015: + return ConnectionSecureOkBody.process(in, dispatcher); + case 0x000a001e: + return ConnectionTuneBody.process(in, dispatcher); + case 0x000a001f: + return ConnectionTuneOkBody.process(in, dispatcher); + case 0x000a0028: + return ConnectionOpenBody.process(in, dispatcher); + case 0x000a0029: + return ConnectionOpenOkBody.process(in, dispatcher); + case 0x000a002a: + return ConnectionRedirectBody.process(in, dispatcher); + case 0x000a0032: + if (protocolVersion.equals(ProtocolVersion.v8_0)) + { + return ConnectionRedirectBody.process(in, dispatcher); + } + else + { + return ConnectionCloseBody.process(in, dispatcher); + } + case 0x000a0033: + if (protocolVersion.equals(ProtocolVersion.v8_0)) + { + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + } + else + { + return dispatcher.connectionCloseOk(); + } + case 0x000a003c: + if (protocolVersion.equals(ProtocolVersion.v8_0)) + { + return ConnectionCloseBody.process(in, dispatcher); + } + else + { + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + } + case 0x000a003d: + if (protocolVersion.equals(ProtocolVersion.v8_0)) + { + return dispatcher.connectionCloseOk(); + } + else + { + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + } + + // CHANNEL_CLASS: + + case 0x0014000a: + return ChannelOpenBody.process(channelId, in, dispatcher); + case 0x0014000b: + return ChannelOpenOkBody.process(channelId, in, protocolVersion, dispatcher); + case 0x00140014: + return ChannelFlowBody.process(channelId, in, dispatcher); + case 0x00140015: + return ChannelFlowOkBody.process(channelId, in, dispatcher); + case 0x0014001e: + return ChannelAlertBody.process(channelId, in, dispatcher); + case 0x00140028: + return ChannelCloseBody.process(channelId, in, dispatcher); + case 0x00140029: + return dispatcher.channelCloseOk(channelId); + + // ACCESS_CLASS: + + case 0x001e000a: + return AccessRequestBody.process(channelId, in, dispatcher); + case 0x001e000b: + return AccessRequestOkBody.process(channelId, in, dispatcher); + + // EXCHANGE_CLASS: + + case 0x0028000a: + return ExchangeDeclareBody.process(channelId, in, dispatcher); + case 0x0028000b: + return dispatcher.exchangeDeclareOk(channelId); + case 0x00280014: + return ExchangeDeleteBody.process(channelId, in, dispatcher); + case 0x00280015: + return dispatcher.exchangeDeleteOk(channelId); + case 0x00280016: + return ExchangeBoundBody.process(channelId, in, dispatcher); + case 0x00280017: + return ExchangeBoundOkBody.process(channelId, in, dispatcher); + + + // QUEUE_CLASS: + + case 0x0032000a: + return QueueDeclareBody.process(channelId, in, dispatcher); + case 0x0032000b: + return QueueDeclareOkBody.process(channelId, in, dispatcher); + case 0x00320014: + return QueueBindBody.process(channelId, in, dispatcher); + case 0x00320015: + return dispatcher.queueBindOk(channelId); + case 0x0032001e: + return QueuePurgeBody.process(channelId, in, dispatcher); + case 0x0032001f: + return QueuePurgeOkBody.process(channelId, in, dispatcher); + case 0x00320028: + return QueueDeleteBody.process(channelId, in, dispatcher); + case 0x00320029: + return QueueDeleteOkBody.process(channelId, in, dispatcher); + case 0x00320032: + return QueueUnbindBody.process(channelId, in, dispatcher); + case 0x00320033: + return dispatcher.queueUnbindOk(channelId); + + + // BASIC_CLASS: + + case 0x003c000a: + return BasicQosBody.process(channelId, in, dispatcher); + case 0x003c000b: + return dispatcher.basicQosOk(channelId); + case 0x003c0014: + return BasicConsumeBody.process(channelId, in, dispatcher); + case 0x003c0015: + return BasicConsumeOkBody.process(channelId, in, dispatcher); + case 0x003c001e: + return BasicCancelBody.process(channelId, in, dispatcher); + case 0x003c001f: + return BasicCancelOkBody.process(channelId, in, dispatcher); + case 0x003c0028: + return BasicPublishBody.process(channelId, in, dispatcher); + case 0x003c0032: + return BasicReturnBody.process(channelId, in, dispatcher); + case 0x003c003c: + return BasicDeliverBody.process(channelId, in, dispatcher); + case 0x003c0046: + return BasicGetBody.process(channelId, in, dispatcher); + case 0x003c0047: + return BasicGetOkBody.process(channelId, in, dispatcher); + case 0x003c0048: + return BasicGetEmptyBody.process(channelId, in, dispatcher); + case 0x003c0050: + return BasicAckBody.process(channelId, in, dispatcher); + case 0x003c005a: + return BasicRejectBody.process(channelId, in, dispatcher); + case 0x003c0064: + return BasicRecoverBody.process(channelId, in, protocolVersion, dispatcher); + case 0x003c0065: + return dispatcher.basicRecoverSyncOk(channelId); + case 0x003c0066: + return BasicRecoverSyncBody.process(channelId, in, dispatcher); + case 0x003c006e: + return BasicRecoverSyncBody.process(channelId, in, dispatcher); + case 0x003c006f: + return dispatcher.basicRecoverSyncOk(channelId); + + // TX_CLASS: + + case 0x005a000a: + return dispatcher.txSelect(channelId); + case 0x005a000b: + return dispatcher.txSelectOk(channelId); + case 0x005a0014: + return dispatcher.txCommit(channelId); + case 0x005a0015: + return dispatcher.txCommitOk(channelId); + case 0x005a001e: + return dispatcher.txRollback(channelId); + case 0x005a001f: + return dispatcher.txRollbackOk(channelId); + + default: + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + + } + } + + private AMQFrameDecodingException newUnknownMethodException(final int classId, final int methodId, ProtocolVersion protocolVersion) + { + return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID, + "Method " + methodId + " unknown in AMQP version " + protocolVersion + + " (while trying to decode class " + classId + " method " + methodId + "."); + } + + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 238f28e73e..83397c37d8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.codec.MarkableDataInput; - import java.io.DataOutput; import java.io.IOException; @@ -39,12 +37,6 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock _bodyFrame = bodyFrame; } - public AMQFrame(final MarkableDataInput in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException - { - this._channel = channel; - this._bodyFrame = bodyFactory.createBody(in,bodySize); - } - public long getSize() { return 1 + 2 + 4 + _bodyFrame.getSize() + 1; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java deleted file mode 100644 index fc49afb131..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.codec.MarkableDataInput; - -public class AMQMethodBodyFactory implements BodyFactory -{ - private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); - - private final MethodRegistrySource _registrySource; - - public AMQMethodBodyFactory(MethodRegistrySource registrySource) - { - _registrySource = registrySource; - } - - public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException - { - return _registrySource.getMethodRegistry().convertToBody(in, bodySize); - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index d3e58c4444..e40452edea 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -28,7 +28,6 @@ import java.io.IOException; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; @@ -107,17 +106,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody } - protected byte readByte(DataInput buffer) throws IOException - { - return buffer.readByte(); - } - - protected AMQShortString readAMQShortString(MarkableDataInput buffer) throws IOException - { - AMQShortString str = buffer.readAMQShortString(); - return str == null ? null : str.intern(false); - } - protected int getSizeOf(AMQShortString string) { return EncodingUtils.encodedShortStringLength(string); @@ -143,11 +131,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody buffer.writeInt(i); } - protected FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException - { - return EncodingUtils.readFieldTable(buffer); - } - protected int getSizeOf(FieldTable table) { return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. @@ -158,11 +141,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody EncodingUtils.writeFieldTableBytes(buffer, table); } - protected long readLong(DataInput buffer) throws IOException - { - return buffer.readLong(); - } - protected void writeLong(DataOutput buffer, long l) throws IOException { buffer.writeLong(l); @@ -178,11 +156,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody EncodingUtils.writeBytes(buffer,data); } - protected byte[] readBytes(DataInput buffer) throws IOException - { - return EncodingUtils.readBytes(buffer); - } - protected short readShort(DataInput buffer) throws IOException { return EncodingUtils.readShort(buffer); @@ -193,16 +166,6 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody EncodingUtils.writeShort(buffer, s); } - protected byte readBitfield(DataInput buffer) throws IOException - { - return readByte(buffer); - } - - protected int readUnsignedShort(DataInput buffer) throws IOException - { - return buffer.readUnsignedShort(); - } - protected void writeBitfield(DataOutput buffer, byte bitfield0) throws IOException { buffer.writeByte(bitfield0); @@ -213,21 +176,12 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody EncodingUtils.writeUnsignedShort(buffer, s); } - protected long readUnsignedInteger(DataInput buffer) throws IOException - { - return EncodingUtils.readUnsignedInteger(buffer); - } protected void writeUnsignedInteger(DataOutput buffer, long i) throws IOException { EncodingUtils.writeUnsignedInteger(buffer, i); } - protected short readUnsignedByte(DataInput buffer) throws IOException - { - return (short) buffer.readUnsignedByte(); - } - protected void writeUnsignedByte(DataOutput buffer, short unsignedByte) throws IOException { EncodingUtils.writeUnsignedByte(buffer, unsignedByte); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java index 695e41bc13..9a386d4eb4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java @@ -46,8 +46,8 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ // Constructor public AccessRequestBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _realm = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); + _realm = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); } public AccessRequestBody( @@ -165,4 +165,17 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + AMQShortString realm = buffer.readAMQShortString(); + byte bitfield = buffer.readByte(); + boolean exclusive = (bitfield & 0x01) == 0x1 ; + boolean passive = (bitfield & 0x02) == 0x2 ; + boolean active = (bitfield & 0x04) == 0x4 ; + boolean write = (bitfield & 0x08) == 0x8 ; + boolean read = (bitfield & 0x10) == 0x10 ; + return dispatcher.accessRequest(channelId, realm, exclusive, passive, active, write, read); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java index c2bd5929fb..8439df9e92 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java @@ -45,7 +45,7 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA // Constructor public AccessRequestOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); + _ticket = buffer.readUnsignedShort(); } public AccessRequestOkBody( @@ -95,4 +95,10 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + throws IOException + { + int ticket = buffer.readUnsignedShort(); + return dispatcher.accessRequestOk(channelId, ticket); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java index b78e27996e..956d10bf0a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java @@ -46,8 +46,8 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB // Constructor public BasicAckBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _deliveryTag = readLong( buffer ); - _bitfield0 = readBitfield( buffer ); + _deliveryTag = buffer.readLong(); + _bitfield0 = buffer.readByte(); } public BasicAckBody( @@ -112,4 +112,13 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + long deliveryTag = buffer.readLong(); + boolean multiple = (buffer.readByte() & 0x01) != 0; + return dispatcher.basicAck(channelId, deliveryTag, multiple); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java index 70783712c2..1c619fd5d4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java @@ -46,8 +46,8 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa // Constructor public BasicCancelBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _consumerTag = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); + _consumerTag = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); } public BasicCancelBody( @@ -113,4 +113,13 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + AMQShortString consumerTag = buffer.readAMQShortString(); + boolean noWait = (buffer.readByte() & 0x01) == 0x01; + return dispatcher.basicCancel(channelId, consumerTag, noWait); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java index 7cec873cc4..c85223cd4f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java @@ -45,7 +45,7 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ // Constructor public BasicCancelOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _consumerTag = readAMQShortString( buffer ); + _consumerTag = buffer.readAMQShortString(); } public BasicCancelOkBody( @@ -96,4 +96,10 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput in, final MethodProcessor<T> dispatcher) + throws IOException + { + AMQShortString consumerTag = in.readAMQShortString(); + return dispatcher.basicCancelOk(channelId, consumerTag); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java index f6a4c7e659..1d6ec46c9a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java @@ -49,11 +49,11 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD // Constructor public BasicConsumeBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _queue = readAMQShortString( buffer ); - _consumerTag = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); - _arguments = readFieldTable( buffer ); + _ticket = buffer.readUnsignedShort(); + _queue = buffer.readAMQShortString(); + _consumerTag = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); + _arguments = EncodingUtils.readFieldTable(buffer); } public BasicConsumeBody( @@ -191,4 +191,20 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + throws IOException, AMQFrameDecodingException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString queue = buffer.readAMQShortString(); + AMQShortString consumerTag = buffer.readAMQShortString(); + byte bitfield = buffer.readByte(); + + boolean noLocal = (bitfield & 0x01) == 0x01; + boolean noAck = (bitfield & 0x02) == 0x02; + boolean exclusive = (bitfield & 0x04) == 0x04; + boolean nowait = (bitfield & 0x08) == 0x08; + FieldTable arguments = EncodingUtils.readFieldTable(buffer); + return dispatcher.basicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java index b68de3a8de..b019574a6b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java @@ -45,7 +45,7 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM // Constructor public BasicConsumeOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _consumerTag = readAMQShortString( buffer ); + _consumerTag = buffer.readAMQShortString(); } public BasicConsumeOkBody( @@ -96,4 +96,10 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + throws IOException + { + AMQShortString consumerTag = buffer.readAMQShortString(); + return dispatcher.basicConsumeOk(channelId, consumerTag); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java index 987b8106d8..76cd9bfff4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java @@ -49,11 +49,11 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD // Constructor public BasicDeliverBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _consumerTag = readAMQShortString( buffer ); - _deliveryTag = readLong( buffer ); - _bitfield0 = readBitfield( buffer ); - _exchange = readAMQShortString( buffer ); - _routingKey = readAMQShortString( buffer ); + _consumerTag = buffer.readAMQShortString(); + _deliveryTag = buffer.readLong(); + _bitfield0 = buffer.readByte(); + _exchange = buffer.readAMQShortString(); + _routingKey = buffer.readAMQShortString(); } public BasicDeliverBody( @@ -152,4 +152,16 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + AMQShortString consumerTag = buffer.readAMQShortString(); + long deliveryTag = buffer.readLong(); + boolean redelivered = (buffer.readByte() & 0x01) != 0; + AMQShortString exchange = buffer.readAMQShortString(); + AMQShortString routingKey = buffer.readAMQShortString(); + return dispatcher.basicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java index 8b6842463e..2ebde34648 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java @@ -47,9 +47,9 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB // Constructor public BasicGetBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _queue = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); + _ticket = buffer.readUnsignedShort(); + _queue = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); } public BasicGetBody( @@ -125,4 +125,13 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + throws IOException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString queue = buffer.readAMQShortString(); + boolean noAck = (buffer.readByte() & 0x01) != 0; + return dispatcher.basicGet(channelId, queue, noAck); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java index c85876b260..508c3f8e66 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java @@ -45,7 +45,7 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ // Constructor public BasicGetEmptyBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _clusterId = readAMQShortString( buffer ); + _clusterId = buffer.readAMQShortString(); } public BasicGetEmptyBody( @@ -89,11 +89,18 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ public String toString() { - StringBuilder buf = new StringBuilder("[BasicGetEmptyBodyImpl: "); + StringBuilder buf = new StringBuilder("[BasicGetEmptyBody: "); buf.append( "clusterId=" ); buf.append( getClusterId() ); buf.append("]"); return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + AMQShortString clusterId = buffer.readAMQShortString(); + return dispatcher.basicGetEmpty(channelId); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java index d3ba82b0d3..4020d8fb23 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java @@ -49,11 +49,11 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat // Constructor public BasicGetOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _deliveryTag = readLong( buffer ); - _bitfield0 = readBitfield( buffer ); - _exchange = readAMQShortString( buffer ); - _routingKey = readAMQShortString( buffer ); - _messageCount = readUnsignedInteger( buffer ); + _deliveryTag = buffer.readLong(); + _bitfield0 = buffer.readByte(); + _exchange = buffer.readAMQShortString(); + _routingKey = buffer.readAMQShortString(); + _messageCount = EncodingUtils.readUnsignedInteger(buffer); } public BasicGetOkBody( @@ -151,4 +151,15 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + long deliveryTag = buffer.readLong(); + boolean redelivered = (buffer.readByte() & 0x01) != 0; + AMQShortString exchange = buffer.readAMQShortString(); + AMQShortString routingKey = buffer.readAMQShortString(); + long messageCount = EncodingUtils.readUnsignedInteger(buffer); + return dispatcher.basicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java index 7d24492395..7920da8405 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java @@ -48,10 +48,10 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD // Constructor public BasicPublishBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _exchange = readAMQShortString( buffer ); - _routingKey = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); + _ticket = buffer.readUnsignedShort(); + _exchange = buffer.readAMQShortString(); + _routingKey = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); } public BasicPublishBody( @@ -151,4 +151,18 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString exchange = buffer.readAMQShortString(); + AMQShortString routingKey = buffer.readAMQShortString(); + byte bitfield = buffer.readByte(); + + boolean mandatory = (bitfield & 0x01) != 0; + boolean immediate = (bitfield & 0x02) != 0; + return dispatcher.basicPublish(channelId, exchange, routingKey, mandatory, immediate); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java index 76b30d1e3c..0843c5ccd7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java @@ -47,9 +47,9 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB // Constructor public BasicQosBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _prefetchSize = readUnsignedInteger( buffer ); - _prefetchCount = readUnsignedShort( buffer ); - _bitfield0 = readBitfield( buffer ); + _prefetchSize = EncodingUtils.readUnsignedInteger(buffer); + _prefetchCount = buffer.readUnsignedShort(); + _bitfield0 = buffer.readByte(); } public BasicQosBody( @@ -124,4 +124,14 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + long prefetchSize = EncodingUtils.readUnsignedInteger(buffer); + int prefetchCount = buffer.readUnsignedShort(); + boolean global = (buffer.readByte() & 0x01) == 0x01; + return dispatcher.basicQos(channelId, prefetchSize, prefetchCount, global); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java index fdda88534c..739470c658 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java @@ -45,7 +45,7 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD // Constructor public BasicRecoverBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _bitfield0 = readBitfield( buffer ); + _bitfield0 = buffer.readByte(); } public BasicRecoverBody( @@ -100,4 +100,14 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput in, + final ProtocolVersion protocolVersion, + final MethodProcessor<T> dispatcher) throws IOException + { + boolean requeue = (in.readByte() & 0x01) == 0x01; + boolean sync = (ProtocolVersion.v8_0.equals(protocolVersion)); + + return dispatcher.basicRecover(channelId, requeue, sync); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java index 4fa98ac2dc..5826bd9d16 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java @@ -46,7 +46,7 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable public BasicRecoverSyncBody(MarkableDataInput buffer, ProtocolVersion protocolVersion) throws AMQFrameDecodingException, IOException { _methodId = ProtocolVersion.v0_9.equals(protocolVersion) ? 102 : 110; - _bitfield0 = readBitfield( buffer ); + _bitfield0 = buffer.readByte(); } public BasicRecoverSyncBody(ProtocolVersion protocolVersion, @@ -103,4 +103,11 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput in, + final MethodProcessor<T> dispatcher) throws IOException + { + boolean requeue = (in.readByte() & 0x01) == 0x01; + return dispatcher.basicRecover(channelId, requeue, true); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java index 65c15d8cd7..dc60d53952 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java @@ -31,7 +31,6 @@ import java.io.DataOutput; import java.io.IOException; import org.apache.qpid.AMQException; -import org.apache.qpid.codec.MarkableDataInput; public class BasicRecoverSyncOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody { diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java index 162fae9dda..83f2727a51 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java @@ -46,8 +46,8 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa // Constructor public BasicRejectBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _deliveryTag = readLong( buffer ); - _bitfield0 = readBitfield( buffer ); + _deliveryTag = buffer.readLong(); + _bitfield0 = buffer.readByte(); } public BasicRejectBody( @@ -112,4 +112,13 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + long deliveryTag = buffer.readLong(); + boolean requeue = (buffer.readByte() & 0x01) != 0; + return dispatcher.basicReject(channelId, deliveryTag, requeue); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java index 073bbceb06..67d6c77312 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java @@ -48,10 +48,10 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa // Constructor public BasicReturnBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _replyCode = readUnsignedShort( buffer ); - _replyText = readAMQShortString( buffer ); - _exchange = readAMQShortString( buffer ); - _routingKey = readAMQShortString( buffer ); + _replyCode = buffer.readUnsignedShort(); + _replyText = buffer.readAMQShortString(); + _exchange = buffer.readAMQShortString(); + _routingKey = buffer.readAMQShortString(); } public BasicReturnBody( @@ -134,4 +134,15 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + int replyCode = buffer.readUnsignedShort(); + AMQShortString replyText = buffer.readAMQShortString(); + AMQShortString exchange = buffer.readAMQShortString(); + AMQShortString routingKey = buffer.readAMQShortString(); + return dispatcher.basicReturn(channelId, replyCode, replyText, exchange, routingKey); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java deleted file mode 100644 index 554e9373d8..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.qpid.codec.MarkableDataInput; - -import java.io.IOException; - -/** - * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. - */ -public interface BodyFactory -{ - AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException; -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java index 9f0e5ecdf5..d5c535b099 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java @@ -47,9 +47,9 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD // Constructor public ChannelAlertBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _replyCode = readUnsignedShort( buffer ); - _replyText = readAMQShortString( buffer ); - _details = readFieldTable( buffer ); + _replyCode = buffer.readUnsignedShort(); + _replyText = buffer.readAMQShortString(); + _details = EncodingUtils.readFieldTable(buffer); } public ChannelAlertBody( @@ -121,4 +121,13 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + throws IOException, AMQFrameDecodingException + { + + int replyCode = buffer.readUnsignedShort(); + AMQShortString replyText = buffer.readAMQShortString(); + FieldTable details = EncodingUtils.readFieldTable(buffer); + return dispatcher.channelAlert(channelId, replyCode, replyText, details); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java index 7c969ba3bb..ea1536ed2b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java @@ -48,10 +48,10 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD // Constructor public ChannelCloseBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _replyCode = readUnsignedShort( buffer ); - _replyText = readAMQShortString( buffer ); - _classId = readUnsignedShort( buffer ); - _methodId = readUnsignedShort( buffer ); + _replyCode = buffer.readUnsignedShort(); + _replyText = buffer.readAMQShortString(); + _classId = buffer.readUnsignedShort(); + _methodId = buffer.readUnsignedShort(); } public ChannelCloseBody( @@ -132,4 +132,15 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + int replyCode = buffer.readUnsignedShort(); + AMQShortString replyText = buffer.readAMQShortString(); + int classId = buffer.readUnsignedShort(); + int methodId = buffer.readUnsignedShort(); + return dispatcher.channelClose(channelId, replyCode, replyText, classId, methodId); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java index d3d7dbc79b..e9b1572eef 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java @@ -35,6 +35,7 @@ import org.apache.qpid.codec.MarkableDataInput; public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody { + public static final ChannelCloseOkBody INSTANCE = new ChannelCloseOkBody(); public static final int CLASS_ID = 20; public static final int METHOD_ID = 41; @@ -46,8 +47,7 @@ public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAM { } - public ChannelCloseOkBody( - ) + private ChannelCloseOkBody() { } @@ -64,8 +64,7 @@ public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAM protected int getBodySize() { - int size = 0; - return size; + return 0; } public void writeMethodPayload(DataOutput buffer) throws IOException @@ -79,9 +78,8 @@ public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAM public String toString() { - StringBuilder buf = new StringBuilder("[ChannelCloseOkBodyImpl: "); - buf.append("]"); - return buf.toString(); + return "[ChannelCloseOkBody]"; + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java index 9973e45e5f..e70eb0ea35 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java @@ -40,24 +40,17 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa public static final int METHOD_ID = 20; // Fields declared in specification - private final byte _bitfield0; // [active] + private final boolean _active; // [active] // Constructor public ChannelFlowBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _bitfield0 = readBitfield( buffer ); + _active = (buffer.readByte() & 0x01) == 0x01; } - public ChannelFlowBody( - boolean active - ) + public ChannelFlowBody(boolean active) { - byte bitfield0 = (byte)0; - if( active ) - { - bitfield0 = (byte) (((int) bitfield0) | (1 << 0)); - } - _bitfield0 = bitfield0; + _active = active; } public int getClazz() @@ -72,18 +65,17 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa public final boolean getActive() { - return (((int)(_bitfield0)) & ( 1 << 0)) != 0; + return _active; } protected int getBodySize() { - int size = 1; - return size; + return 1; } public void writeMethodPayload(DataOutput buffer) throws IOException { - writeBitfield( buffer, _bitfield0 ); + writeBitfield( buffer, _active ? (byte)1 : (byte)0); } public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException @@ -100,4 +92,11 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + boolean active = (buffer.readByte() & 0x01) == 0x01; + return dispatcher.channelFlow(channelId, active); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java index 20c793c3f4..13bdf332d2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java @@ -40,24 +40,17 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ public static final int METHOD_ID = 21; // Fields declared in specification - private final byte _bitfield0; // [active] + private final boolean _active; // [active] // Constructor public ChannelFlowOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _bitfield0 = readBitfield( buffer ); + _active = (buffer.readByte() & 0x01) == 0x01; } - public ChannelFlowOkBody( - boolean active - ) + public ChannelFlowOkBody(boolean active) { - byte bitfield0 = (byte)0; - if( active ) - { - bitfield0 = (byte) (((int) bitfield0) | (1 << 0)); - } - _bitfield0 = bitfield0; + _active = active; } public int getClazz() @@ -72,7 +65,7 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ public final boolean getActive() { - return (((int)(_bitfield0)) & ( 1 << 0)) != 0; + return _active; } protected int getBodySize() @@ -83,7 +76,7 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ public void writeMethodPayload(DataOutput buffer) throws IOException { - writeBitfield( buffer, _bitfield0 ); + writeBitfield( buffer, _active ? (byte)1 : (byte)0 ); } public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException @@ -100,4 +93,10 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + throws IOException + { + boolean active = (buffer.readByte() & 0x01) == 0x01; + return dispatcher.channelFlowOk(channelId, active); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java index a04856d49b..f96eb9344b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java @@ -39,20 +39,17 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa public static final int CLASS_ID = 20; public static final int METHOD_ID = 10; - // Fields declared in specification - private final AMQShortString _outOfBand; // [outOfBand] // Constructor public ChannelOpenBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _outOfBand = readAMQShortString( buffer ); + // ignore unused OOB string + buffer.readAMQShortString(); } - public ChannelOpenBody( - AMQShortString outOfBand - ) + public ChannelOpenBody() { - _outOfBand = outOfBand; + } public int getClazz() @@ -65,21 +62,14 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa return METHOD_ID; } - public final AMQShortString getOutOfBand() - { - return _outOfBand; - } - protected int getBodySize() { - int size = 0; - size += getSizeOf( _outOfBand ); - return size; + return 1; } public void writeMethodPayload(DataOutput buffer) throws IOException { - writeAMQShortString( buffer, _outOfBand ); + writeAMQShortString( buffer, null ); } public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException @@ -89,11 +79,14 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa public String toString() { - StringBuilder buf = new StringBuilder("[ChannelOpenBodyImpl: "); - buf.append( "outOfBand=" ); - buf.append( getOutOfBand() ); - buf.append("]"); - return buf.toString(); + return "[ChannelOpenBody] "; } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + buffer.readAMQShortString(); + return dispatcher.channelOpen(channelId); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java index a60e7f331b..5cf4d91970 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java @@ -96,4 +96,16 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ return "[ChannelOpenOkBody]"; } + public static <T> T process(final int channelId, + final MarkableDataInput in, + final ProtocolVersion protocolVersion, + final MethodProcessor<T> dispatcher) throws IOException + { + if(!ProtocolVersion.v8_0.equals(protocolVersion)) + { + EncodingUtils.readBytes(in); + } + + return dispatcher.channelOpenOk(channelId); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java index 64388a42f3..02f214cee9 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java @@ -49,10 +49,10 @@ public class ConnectionCloseBody extends AMQMethodBodyImpl implements EncodableA public ConnectionCloseBody(MarkableDataInput buffer, ProtocolVersion protocolVersion) throws AMQFrameDecodingException, IOException { _ownMethodId = ProtocolVersion.v8_0.equals(protocolVersion) ? 60 : 50; - _replyCode = readUnsignedShort( buffer ); - _replyText = readAMQShortString( buffer ); - _classId = readUnsignedShort( buffer ); - _methodId = readUnsignedShort( buffer ); + _replyCode = buffer.readUnsignedShort(); + _replyText = buffer.readAMQShortString(); + _classId = buffer.readUnsignedShort(); + _methodId = buffer.readUnsignedShort(); } public ConnectionCloseBody(ProtocolVersion protocolVersion, @@ -134,4 +134,12 @@ public class ConnectionCloseBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } + public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + { + int replyCode = buffer.readUnsignedShort(); + AMQShortString replyText = buffer.readAMQShortString(); + int classId = buffer.readUnsignedShort(); + int methodId = buffer.readUnsignedShort(); + return dispatcher.connectionClose(replyCode, replyText, classId, methodId); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java index df20a1e0b3..f9f55446dd 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java @@ -42,14 +42,14 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM // Fields declared in specification private final AMQShortString _virtualHost; // [virtualHost] private final AMQShortString _capabilities; // [capabilities] - private final byte _bitfield0; // [insist] + private final boolean _insist; // [insist] // Constructor public ConnectionOpenBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _virtualHost = readAMQShortString( buffer ); - _capabilities = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); + _virtualHost = buffer.readAMQShortString(); + _capabilities = buffer.readAMQShortString(); + _insist = (buffer.readByte() & 0x01) == 0x01; } public ConnectionOpenBody( @@ -60,12 +60,7 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM { _virtualHost = virtualHost; _capabilities = capabilities; - byte bitfield0 = (byte)0; - if( insist ) - { - bitfield0 = (byte) (((int) bitfield0) | (1 << 0)); - } - _bitfield0 = bitfield0; + _insist = insist; } public int getClazz() @@ -88,7 +83,7 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM } public final boolean getInsist() { - return (((int)(_bitfield0)) & ( 1 << 0)) != 0; + return _insist; } protected int getBodySize() @@ -103,7 +98,7 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM { writeAMQShortString( buffer, _virtualHost ); writeAMQShortString( buffer, _capabilities ); - writeBitfield( buffer, _bitfield0 ); + writeBitfield( buffer, _insist ? (byte)1 : (byte)0); } public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException @@ -126,4 +121,12 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } + public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + { + + AMQShortString virtualHost = buffer.readAMQShortString(); + AMQShortString capabilities = buffer.readAMQShortString(); + boolean insist = (buffer.readByte() & 0x01) == 0x01; + return dispatcher.connectionOpen(virtualHost, capabilities, insist); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java index 68cb424f3b..3f04da7a29 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java @@ -45,7 +45,7 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable // Constructor public ConnectionOpenOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _knownHosts = readAMQShortString( buffer ); + _knownHosts = buffer.readAMQShortString(); } public ConnectionOpenOkBody( @@ -96,4 +96,10 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } + public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + { + AMQShortString knownHosts = buffer.readAMQShortString(); + return dispatcher.connectionOpenOk(knownHosts); + + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java index 569e9d1cc1..80c655683f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java @@ -47,8 +47,8 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab public ConnectionRedirectBody(MarkableDataInput buffer, ProtocolVersion protocolVersion) throws AMQFrameDecodingException, IOException { _ownMethodId = ProtocolVersion.v8_0.equals(protocolVersion) ? 50 : 42; - _host = readAMQShortString( buffer ); - _knownHosts = readAMQShortString( buffer ); + _host = buffer.readAMQShortString(); + _knownHosts = buffer.readAMQShortString(); } public ConnectionRedirectBody(ProtocolVersion protocolVersion, AMQShortString host, AMQShortString knownHosts) @@ -108,4 +108,10 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab return buf.toString(); } + public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + { + AMQShortString host = buffer.readAMQShortString(); + AMQShortString knownHosts = buffer.readAMQShortString(); + return dispatcher.connectionRedirect(host, knownHosts); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java index 18faeacf34..ca208d5a89 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java @@ -45,7 +45,7 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable // Constructor public ConnectionSecureBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _challenge = readBytes( buffer ); + _challenge = EncodingUtils.readBytes(buffer); } public ConnectionSecureBody( @@ -96,4 +96,11 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } + public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) + throws IOException, AMQFrameDecodingException + + { + byte[] challenge = EncodingUtils.readBytes(in); + return dispatcher.connectionSecure(challenge); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java index 1131567b06..0a2bfa613e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java @@ -45,7 +45,7 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab // Constructor public ConnectionSecureOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _response = readBytes( buffer ); + _response = EncodingUtils.readBytes(buffer); } public ConnectionSecureOkBody( @@ -96,4 +96,9 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab return buf.toString(); } + public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) throws IOException + { + byte[] response = EncodingUtils.readBytes(in); + return dispatcher.connectionSecureOk(response); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java index a445dd953a..17a568d737 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java @@ -46,16 +46,6 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA private final byte[] _mechanisms; // [mechanisms] private final byte[] _locales; // [locales] - // Constructor - public ConnectionStartBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException - { - _versionMajor = readUnsignedByte( buffer ); - _versionMinor = readUnsignedByte( buffer ); - _serverProperties = readFieldTable( buffer ); - _mechanisms = readBytes( buffer ); - _locales = readBytes( buffer ); - } - public ConnectionStartBody( short versionMajor, short versionMinor, @@ -146,4 +136,16 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } + public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) + throws IOException, AMQFrameDecodingException + { + short versionMajor = (short) in.readUnsignedByte(); + short versionMinor = (short) in.readUnsignedByte(); + FieldTable serverProperties = EncodingUtils.readFieldTable(in); + byte[] mechanisms = EncodingUtils.readBytes(in); + byte[] locales = EncodingUtils.readBytes(in); + + + return dispatcher.connectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java index e2284a7a5e..ba8182e569 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java @@ -45,15 +45,6 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl private final byte[] _response; // [response] private final AMQShortString _locale; // [locale] - // Constructor - public ConnectionStartOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException - { - _clientProperties = readFieldTable( buffer ); - _mechanism = readAMQShortString( buffer ); - _response = readBytes( buffer ); - _locale = readAMQShortString( buffer ); - } - public ConnectionStartOkBody( FieldTable clientProperties, AMQShortString mechanism, @@ -135,4 +126,15 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl return buf.toString(); } + public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) + throws IOException, AMQFrameDecodingException + { + + FieldTable clientProperties = EncodingUtils.readFieldTable(in); + AMQShortString mechanism = in.readAMQShortString(); + byte[] response = EncodingUtils.readBytes(in); + AMQShortString locale = in.readAMQShortString(); + + return dispatcher.connectionStartOk(clientProperties, mechanism, response, locale); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java index 2b6a67c4f6..2ca8e57e18 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java @@ -47,9 +47,9 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM // Constructor public ConnectionTuneBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _channelMax = readUnsignedShort( buffer ); - _frameMax = readUnsignedInteger( buffer ); - _heartbeat = readUnsignedShort( buffer ); + _channelMax = buffer.readUnsignedShort(); + _frameMax = EncodingUtils.readUnsignedInteger(buffer); + _heartbeat = buffer.readUnsignedShort(); } public ConnectionTuneBody( @@ -119,4 +119,12 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } + public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + { + + int channelMax = buffer.readUnsignedShort(); + long frameMax = EncodingUtils.readUnsignedInteger(buffer); + int heartbeat = buffer.readUnsignedShort(); + return dispatcher.connectionTune(channelMax, frameMax, heartbeat); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java index 84ab6027e5..7a259b6419 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java @@ -47,9 +47,9 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable // Constructor public ConnectionTuneOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _channelMax = readUnsignedShort( buffer ); - _frameMax = readUnsignedInteger( buffer ); - _heartbeat = readUnsignedShort( buffer ); + _channelMax = buffer.readUnsignedShort(); + _frameMax = EncodingUtils.readUnsignedInteger(buffer); + _heartbeat = buffer.readUnsignedShort(); } public ConnectionTuneOkBody( @@ -119,4 +119,12 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } + public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + { + + int channelMax = buffer.readUnsignedShort(); + long frameMax = EncodingUtils.readUnsignedInteger(buffer); + int heartbeat = buffer.readUnsignedShort(); + return dispatcher.connectionTuneOk(channelMax, frameMax, heartbeat); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 6d6ec708d0..dc345a6cc6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -20,15 +20,16 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + public class ContentBody implements AMQBody { public static final byte TYPE = 3; @@ -91,6 +92,16 @@ public class ContentBody implements AMQBody return _payload; } + public static <T> T process(final int channel, + final MarkableDataInput in, + final MethodProcessor<T> methodProcessor, final long bodySize) + throws IOException + { + byte[] payload = new byte[(int)bodySize]; + in.readFully(payload); + return methodProcessor.messageContent(channel, payload); + } + private static class BufferContentBody implements AMQBody { private final int _length; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java deleted file mode 100644 index 10df105ee6..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.codec.MarkableDataInput; - -import java.io.IOException; - -public class ContentBodyFactory implements BodyFactory -{ - private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); - - private static final ContentBodyFactory _instance = new ContentBodyFactory(); - - public static ContentBodyFactory getInstance() - { - return _instance; - } - - private ContentBodyFactory() - { - _log.debug("Creating content body factory"); - } - - public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException - { - return new ContentBody(in, bodySize); - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index f2a443d5fd..081b4bdfee 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -20,54 +20,45 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + public class ContentHeaderBody implements AMQBody { public static final byte TYPE = 2; + public static final int CLASS_ID = 60; - private int classId; - - private int weight; - - private long bodySize; + private long _bodySize; /** must never be null */ - private BasicContentHeaderProperties properties; - - public ContentHeaderBody() - { - } + private BasicContentHeaderProperties _properties; public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException { - classId = buffer.readUnsignedShort(); - weight = buffer.readUnsignedShort(); - bodySize = buffer.readLong(); + buffer.readUnsignedShort(); + buffer.readUnsignedShort(); + _bodySize = buffer.readLong(); int propertyFlags = buffer.readUnsignedShort(); ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); - properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); + _properties = factory.createContentHeaderProperties(CLASS_ID, propertyFlags, buffer, (int)size - 14); } - - public ContentHeaderBody(BasicContentHeaderProperties props, int classId) + public ContentHeaderBody(BasicContentHeaderProperties props) { - properties = props; - this.classId = classId; + _properties = props; } - public ContentHeaderBody(int classId, int weight, BasicContentHeaderProperties props, long bodySize) + public ContentHeaderBody(BasicContentHeaderProperties props, long bodySize) { - this(props, classId); - this.weight = weight; - this.bodySize = bodySize; + _properties = props; + _bodySize = bodySize; } public byte getFrameType() @@ -95,16 +86,16 @@ public class ContentHeaderBody implements AMQBody public int getSize() { - return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); + return 2 + 2 + 8 + 2 + _properties.getPropertyListSize(); } public void writePayload(DataOutput buffer) throws IOException { - EncodingUtils.writeUnsignedShort(buffer, classId); - EncodingUtils.writeUnsignedShort(buffer, weight); - buffer.writeLong(bodySize); - EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags()); - properties.writePropertyListPayload(buffer); + EncodingUtils.writeUnsignedShort(buffer, CLASS_ID); + EncodingUtils.writeUnsignedShort(buffer, 0); + buffer.writeLong(_bodySize); + EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags()); + _properties.writePropertyListPayload(buffer); } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) @@ -113,46 +104,42 @@ public class ContentHeaderBody implements AMQBody session.contentHeaderReceived(channelId, this); } - public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, + public static AMQFrame createAMQFrame(int channelId, + BasicContentHeaderProperties properties, long bodySize) { - return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize)); - } - - public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body) - { - return new AMQFrame(channelId, body); + return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)); } public BasicContentHeaderProperties getProperties() { - return properties; + return _properties; } public void setProperties(BasicContentHeaderProperties props) { - properties = props; + _properties = props; } @Override public String toString() { return "ContentHeaderBody{" + - "classId=" + classId + - ", weight=" + weight + - ", bodySize=" + bodySize + - ", properties=" + properties + + "classId=" + CLASS_ID + + ", weight=" + 0 + + ", bodySize=" + _bodySize + + ", properties=" + _properties + '}'; } public int getClassId() { - return classId; + return CLASS_ID; } public int getWeight() { - return weight; + return 0; } /** unsigned long but java can't handle that anyway when allocating byte array @@ -160,11 +147,34 @@ public class ContentHeaderBody implements AMQBody * @return the body size */ public long getBodySize() { - return bodySize; + return _bodySize; } public void setBodySize(long bodySize) { - this.bodySize = bodySize; + _bodySize = bodySize; + } + + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> methodProcessor, final long size) + throws IOException, AMQFrameDecodingException + { + + int classId = buffer.readUnsignedShort(); + buffer.readUnsignedShort(); + long bodySize = buffer.readLong(); + int propertyFlags = buffer.readUnsignedShort(); + + BasicContentHeaderProperties properties; + + if (classId != BasicConsumeBody.CLASS_ID) + { + throw new AMQFrameDecodingException(null, "Unsupported content header class id: " + classId, null); + } + properties = new BasicContentHeaderProperties(); + properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14)); + + return methodProcessor.messageHeader(channelId, properties, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java deleted file mode 100644 index 83a5211013..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.codec.MarkableDataInput; - -import java.io.IOException; - -public class ContentHeaderBodyFactory implements BodyFactory -{ - private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); - - private static final ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory(); - - public static ContentHeaderBodyFactory getInstance() - { - return _instance; - } - - private ContentHeaderBodyFactory() - { - _log.debug("Creating content header body factory"); - } - - public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException - { - // all content headers are the same - it is only the properties that differ. - // the content header body further delegates construction of properties - return new ContentHeaderBody(in, bodySize); - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java index 028e3c83be..8244768fb6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java @@ -47,9 +47,9 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ // Constructor public ExchangeBoundBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _exchange = readAMQShortString( buffer ); - _routingKey = readAMQShortString( buffer ); - _queue = readAMQShortString( buffer ); + _exchange = buffer.readAMQShortString(); + _routingKey = buffer.readAMQShortString(); + _queue = buffer.readAMQShortString(); } public ExchangeBoundBody( @@ -122,4 +122,13 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + throws IOException + { + + AMQShortString exchange = buffer.readAMQShortString(); + AMQShortString routingKey = buffer.readAMQShortString(); + AMQShortString queue = buffer.readAMQShortString(); + return dispatcher.exchangeBound(channelId, exchange, routingKey, queue); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java index fa12cdf2cf..2d89a9f467 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java @@ -46,8 +46,8 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA // Constructor public ExchangeBoundOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _replyCode = readUnsignedShort( buffer ); - _replyText = readAMQShortString( buffer ); + _replyCode = buffer.readUnsignedShort(); + _replyText = buffer.readAMQShortString(); } public ExchangeBoundOkBody( @@ -108,4 +108,12 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + throws IOException + { + + int replyCode = buffer.readUnsignedShort(); + AMQShortString replyText = buffer.readAMQShortString(); + return dispatcher.exchangeBoundOk(channelId, replyCode, replyText); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java index 23f428a381..f96e6d382e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java @@ -49,11 +49,11 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA // Constructor public ExchangeDeclareBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _exchange = readAMQShortString( buffer ); - _type = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); - _arguments = readFieldTable( buffer ); + _ticket = buffer.readUnsignedShort(); + _exchange = buffer.readAMQShortString(); + _type = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); + _arguments = EncodingUtils.readFieldTable(buffer); } public ExchangeDeclareBody( @@ -204,4 +204,21 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString exchange = buffer.readAMQShortString(); + AMQShortString type = buffer.readAMQShortString(); + byte bitfield = buffer.readByte(); + boolean passive = (bitfield & 0x1) == 0x1; + boolean durable = (bitfield & 0x2) == 0x2; + boolean autoDelete = (bitfield & 0x4) == 0x4; + boolean internal = (bitfield & 0x8) == 0x8; + boolean nowait = (bitfield & 0x10) == 0x10; + FieldTable arguments = EncodingUtils.readFieldTable(buffer); + return dispatcher.exchangeDeclare(channelId, exchange, type, passive, durable, autoDelete, internal, nowait, arguments); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java index 98b0ba30f0..771fa63063 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java @@ -47,9 +47,9 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM // Constructor public ExchangeDeleteBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _exchange = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); + _ticket = buffer.readUnsignedShort(); + _exchange = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); } public ExchangeDeleteBody( @@ -138,4 +138,15 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } + public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + throws IOException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString exchange = buffer.readAMQShortString(); + byte bitfield = buffer.readByte(); + boolean ifUnused = (bitfield & 0x01) == 0x01; + boolean nowait = (bitfield & 0x02) == 0x02; + return dispatcher.exchangeDelete(channelId, exchange, ifUnused, nowait); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java new file mode 100644 index 0000000000..c8b7d2639d --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java @@ -0,0 +1,503 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public class FrameCreatingMethodProcessor implements MethodProcessor<AMQFrame> +{ + private final MethodRegistry _methodRegistry; + + FrameCreatingMethodProcessor(final MethodRegistry methodRegistry) + { + _methodRegistry = methodRegistry; + } + + @Override + public AMQFrame connectionStart(final short versionMajor, + final short versionMinor, + final FieldTable serverProperties, + final byte[] mechanisms, + final byte[] locales) + { + return new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales)); + } + + @Override + public AMQFrame connectionStartOk(final FieldTable clientProperties, + final AMQShortString mechanism, + final byte[] response, + final AMQShortString locale) + { + return new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale)); + } + + @Override + public AMQFrame txSelect(final int channelId) + { + return new AMQFrame(channelId, TxSelectBody.INSTANCE); + } + + @Override + public AMQFrame txSelectOk(final int channelId) + { + return new AMQFrame(channelId, TxSelectOkBody.INSTANCE); + } + + @Override + public AMQFrame txCommit(final int channelId) + { + return new AMQFrame(channelId, TxCommitBody.INSTANCE); + } + + @Override + public AMQFrame txCommitOk(final int channelId) + { + return new AMQFrame(channelId, TxCommitOkBody.INSTANCE); + } + + @Override + public AMQFrame txRollback(final int channelId) + { + return new AMQFrame(channelId, TxRollbackBody.INSTANCE); + } + + @Override + public AMQFrame txRollbackOk(final int channelId) + { + return new AMQFrame(channelId, TxRollbackOkBody.INSTANCE); + } + + @Override + public AMQFrame connectionSecure(final byte[] challenge) + { + return new AMQFrame(0, new ConnectionSecureBody(challenge)); + } + + @Override + public AMQFrame connectionSecureOk(final byte[] response) + { + return new AMQFrame(0, new ConnectionSecureOkBody(response)); + } + + @Override + public AMQFrame connectionTune(final int channelMax, final long frameMax, final int heartbeat) + { + return new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat)); + } + + @Override + public AMQFrame connectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) + { + return new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat)); + } + + @Override + public AMQFrame connectionOpen(final AMQShortString virtualHost, + final AMQShortString capabilities, + final boolean insist) + { + return new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist)); + } + + @Override + public AMQFrame connectionOpenOk(final AMQShortString knownHosts) + { + return new AMQFrame(0, new ConnectionOpenOkBody(knownHosts)); + } + + @Override + public AMQFrame connectionRedirect(final AMQShortString host, final AMQShortString knownHosts) + { + return new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts)); + } + + @Override + public AMQFrame connectionClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) + { + return new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId)); + } + + @Override + public AMQFrame connectionCloseOk() + { + return new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion()) + ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8 + : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9); + } + + @Override + public AMQFrame channelOpen(final int channelId) + { + return new AMQFrame(channelId, new ChannelOpenBody()); + } + + @Override + public AMQFrame channelOpenOk(final int channelId) + { + return new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) + ? ChannelOpenOkBody.INSTANCE_0_8 + : ChannelOpenOkBody.INSTANCE_0_9); + } + + @Override + public AMQFrame channelFlow(final int channelId, final boolean active) + { + return new AMQFrame(channelId, new ChannelFlowBody(active)); + } + + @Override + public AMQFrame channelFlowOk(final int channelId, final boolean active) + { + return new AMQFrame(channelId, new ChannelFlowOkBody(active)); + } + + @Override + public AMQFrame channelAlert(final int channelId, + final int replyCode, + final AMQShortString replyText, + final FieldTable details) + { + return new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)); + } + + @Override + public AMQFrame channelClose(final int channelId, + final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) + { + return new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)); + } + + @Override + public AMQFrame channelCloseOk(final int channelId) + { + return new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE); + } + + @Override + public AMQFrame accessRequest(final int channelId, + final AMQShortString realm, + final boolean exclusive, + final boolean passive, + final boolean active, + final boolean write, + final boolean read) + { + return new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)); + } + + @Override + public AMQFrame accessRequestOk(final int channelId, final int ticket) + { + return new AMQFrame(channelId, new AccessRequestOkBody(ticket)); + } + + @Override + public AMQFrame exchangeDeclare(final int channelId, + final AMQShortString exchange, + final AMQShortString type, + final boolean passive, + final boolean durable, + final boolean autoDelete, + final boolean internal, + final boolean nowait, final FieldTable arguments) + { + return new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments)); + } + + @Override + public AMQFrame exchangeDeclareOk(final int channelId) + { + return new AMQFrame(channelId, new ExchangeDeclareOkBody()); + } + + @Override + public AMQFrame exchangeDelete(final int channelId, + final AMQShortString exchange, + final boolean ifUnused, + final boolean nowait) + { + return new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)); + } + + @Override + public AMQFrame exchangeDeleteOk(final int channelId) + { + return new AMQFrame(channelId, new ExchangeDeleteOkBody()); + } + + @Override + public AMQFrame exchangeBound(final int channelId, + final AMQShortString exchange, + final AMQShortString routingKey, + final AMQShortString queue) + { + return new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)); + } + + @Override + public AMQFrame exchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) + { + return new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)); + } + + @Override + public AMQFrame queueBindOk(final int channelId) + { + return new AMQFrame(channelId, new QueueBindOkBody()); + } + + @Override + public AMQFrame queueUnbindOk(final int channelId) + { + return new AMQFrame(channelId, new QueueUnbindOkBody()); + } + + @Override + public AMQFrame queueDeclare(final int channelId, + final AMQShortString queue, + final boolean passive, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, + final boolean nowait, + final FieldTable arguments) + { + return new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments)); + } + + @Override + public AMQFrame queueDeclareOk(final int channelId, + final AMQShortString queue, + final long messageCount, + final long consumerCount) + { + return new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)); + } + + @Override + public AMQFrame queueBind(final int channelId, + final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final boolean nowait, + final FieldTable arguments) + { + return new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)); + } + + @Override + public AMQFrame queuePurge(final int channelId, final AMQShortString queue, final boolean nowait) + { + return new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)); + } + + @Override + public AMQFrame queuePurgeOk(final int channelId, final long messageCount) + { + return new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)); + } + + @Override + public AMQFrame queueDelete(final int channelId, + final AMQShortString queue, + final boolean ifUnused, + final boolean ifEmpty, + final boolean nowait) + { + return new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)); + } + + @Override + public AMQFrame queueDeleteOk(final int channelId, final long messageCount) + { + return new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)); + } + + @Override + public AMQFrame queueUnbind(final int channelId, + final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final FieldTable arguments) + { + return new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)); + } + + @Override + public AMQFrame basicRecoverSyncOk(final int channelId) + { + return new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())); + } + + @Override + public AMQFrame basicRecover(final int channelId, final boolean requeue, final boolean sync) + { + if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) + { + return new AMQFrame(channelId, new BasicRecoverBody(requeue)); + } + else + { + return new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)); + } + } + + @Override + public AMQFrame basicQos(final int channelId, + final long prefetchSize, + final int prefetchCount, + final boolean global) + { + return new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)); + } + + @Override + public AMQFrame basicQosOk(final int channelId) + { + return new AMQFrame(channelId, new BasicQosOkBody()); + } + + @Override + public AMQFrame basicConsume(final int channelId, + final AMQShortString queue, + final AMQShortString consumerTag, + final boolean noLocal, + final boolean noAck, + final boolean exclusive, + final boolean nowait, + final FieldTable arguments) + { + return new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments)); + } + + @Override + public AMQFrame basicConsumeOk(final int channelId, final AMQShortString consumerTag) + { + return new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)); + } + + @Override + public AMQFrame basicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) + { + return new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)); + } + + @Override + public AMQFrame basicCancelOk(final int channelId, final AMQShortString consumerTag) + { + return new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)); + } + + @Override + public AMQFrame basicPublish(final int channelId, + final AMQShortString exchange, + final AMQShortString routingKey, + final boolean mandatory, + final boolean immediate) + { + return new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)); + } + + @Override + public AMQFrame basicReturn(final int channelId, final int replyCode, + final AMQShortString replyText, + final AMQShortString exchange, + final AMQShortString routingKey) + { + return new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)); + } + + @Override + public AMQFrame basicDeliver(final int channelId, + final AMQShortString consumerTag, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey) + { + return new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)); + } + + @Override + public AMQFrame basicGet(final int channelId, final AMQShortString queue, final boolean noAck) + { + return new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)); + } + + @Override + public AMQFrame basicGetOk(final int channelId, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey, + final long messageCount) + { + return new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)); + } + + @Override + public AMQFrame basicGetEmpty(final int channelId) + { + return new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null)); + } + + @Override + public AMQFrame basicAck(final int channelId, final long deliveryTag, final boolean multiple) + { + return new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)); + } + + @Override + public AMQFrame basicReject(final int channelId, final long deliveryTag, final boolean requeue) + { + return new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)); + } + + @Override + public AMQFrame heartbeat() + { + return new AMQFrame(0, new HeartbeatBody()); + } + + private ProtocolVersion getProtocolVersion() + { + return _methodRegistry.getProtocolVersion(); + } + + @Override + public AMQFrame messageContent(final int channelId, final byte[] data) + { + return new AMQFrame(channelId, new ContentBody(data)); + } + + @Override + public AMQFrame messageHeader(final int channelId, + final BasicContentHeaderProperties properties, + final long bodySize) + { + return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 1613cd055e..23f71c62db 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + public class HeartbeatBody implements AMQBody { public static final byte TYPE = 8; @@ -79,4 +80,17 @@ public class HeartbeatBody implements AMQBody { return new AMQFrame(0, this); } + + public static <T> T process(final int channel, + final MarkableDataInput in, + final MethodProcessor<T> processor, + final long bodySize) throws IOException + { + + if(bodySize > 0) + { + in.skip(bodySize); + } + return processor.heartbeat(); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java deleted file mode 100644 index 971caca41a..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.qpid.codec.MarkableDataInput; - -public class HeartbeatBodyFactory implements BodyFactory -{ - public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException - { - return new HeartbeatBody(); - } - -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java new file mode 100644 index 0000000000..ecedacaba4 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface MethodProcessor<T> +{ + T connectionStart(short versionMajor, + short versionMinor, + FieldTable serverProperties, + byte[] mechanisms, + byte[] locales); + + T connectionStartOk(FieldTable clientProperties, + AMQShortString mechanism, + byte[] response, + AMQShortString locale); + + T txSelect(int channelId); + + T txSelectOk(int channelId); + + T txCommit(int channelId); + + T txCommitOk(int channelId); + + T txRollback(int channelId); + + T txRollbackOk(int channelId); + + T connectionSecure(byte[] challenge); + + T connectionSecureOk(byte[] response); + + T connectionTune(int channelMax, long frameMax, int heartbeat); + + T connectionTuneOk(int channelMax, long frameMax, int heartbeat); + + T connectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); + + T connectionOpenOk(AMQShortString knownHosts); + + T connectionRedirect(AMQShortString host, AMQShortString knownHosts); + + T connectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); + + T connectionCloseOk(); + + T channelOpen(int channelId); + + T channelOpenOk(int channelId); + + T channelFlow(int channelId, boolean active); + + T channelFlowOk(int channelId, boolean active); + + T channelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details); + + T channelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId); + + T channelCloseOk(int channelId); + + T accessRequest(int channelId, + AMQShortString realm, + boolean exclusive, + boolean passive, + boolean active, + boolean write, boolean read); + + T accessRequestOk(int channelId, int ticket); + + T exchangeDeclare(int channelId, + AMQShortString exchange, + AMQShortString type, + boolean passive, + boolean durable, + boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); + + T exchangeDeclareOk(int channelId); + + T exchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait); + + T exchangeDeleteOk(int channelId); + + T exchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); + + T exchangeBoundOk(int channelId, int replyCode, AMQShortString replyText); + + T queueBindOk(int channelId); + + T queueUnbindOk(final int channelId); + + T queueDeclare(int channelId, + AMQShortString queue, + boolean passive, + boolean durable, + boolean exclusive, + boolean autoDelete, boolean nowait, FieldTable arguments); + + T queueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount); + + T queueBind(int channelId, + AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + boolean nowait, FieldTable arguments); + + T queuePurge(int channelId, AMQShortString queue, boolean nowait); + + T queuePurgeOk(int channelId, long messageCount); + + T queueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); + + T queueDeleteOk(int channelId, long messageCount); + + T queueUnbind(int channelId, + AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + FieldTable arguments); + + T basicRecoverSyncOk(int channelId); + + T basicRecover(int channelId, final boolean requeue, boolean sync); + + T basicQos(int channelId, long prefetchSize, int prefetchCount, boolean global); + + T basicQosOk(int channelId); + + T basicConsume(int channelId, + AMQShortString queue, + AMQShortString consumerTag, + boolean noLocal, + boolean noAck, + boolean exclusive, boolean nowait, FieldTable arguments); + + T basicConsumeOk(int channelId, AMQShortString consumerTag); + + T basicCancel(int channelId, AMQShortString consumerTag, boolean noWait); + + T basicCancelOk(int channelId, AMQShortString consumerTag); + + T basicPublish(int channelId, + AMQShortString exchange, + AMQShortString routingKey, + boolean mandatory, + boolean immediate); + + T basicReturn(final int channelId, + int replyCode, + AMQShortString replyText, + AMQShortString exchange, + AMQShortString routingKey); + + T basicDeliver(int channelId, + AMQShortString consumerTag, + long deliveryTag, + boolean redelivered, + AMQShortString exchange, AMQShortString routingKey); + + T basicGet(int channelId, AMQShortString queue, boolean noAck); + + T basicGetOk(int channelId, + long deliveryTag, + boolean redelivered, + AMQShortString exchange, + AMQShortString routingKey, long messageCount); + + T basicGetEmpty(int channelId); + + T basicAck(int channelId, long deliveryTag, boolean multiple); + + T basicReject(int channelId, long deliveryTag, boolean requeue); + + T heartbeat(); + + T messageContent(int channelId, byte[] data); + + T messageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java index 0fad853194..c4fd131d0e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java @@ -29,302 +29,23 @@ package org.apache.qpid.framing; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpid.codec.MarkableDataInput; -import org.apache.qpid.protocol.AMQConstant; - - public final class MethodRegistry { - private static final Map<ProtocolVersion, MethodRegistry> _registries = new HashMap<>(); - - - public static final MethodRegistry registry_0_9 = - new MethodRegistry(ProtocolVersion.v0_9); - - public static final MethodRegistry registry_0_91 = - new MethodRegistry(ProtocolVersion.v0_91); - - public static final MethodRegistry registry_8_0 = - new MethodRegistry(ProtocolVersion.v8_0); - - private final ProtocolVersion _protocolVersion; - - public final AMQMethodBody convertToBody(MarkableDataInput in, long size) - throws AMQFrameDecodingException, IOException - { - final int classAndMethod = in.readInt(); - - AMQMethodBody methodBody; - switch(classAndMethod) - { - //CONNECTION_CLASS: - case 0x000a000a: - methodBody = new ConnectionStartBody(in); - break; - case 0x000a000b: - methodBody = new ConnectionStartOkBody(in); - break; - case 0x000a0014: - methodBody = new ConnectionSecureBody(in); - break; - case 0x000a0015: - methodBody = new ConnectionSecureOkBody(in); - break; - case 0x000a001e: - methodBody = new ConnectionTuneBody(in); - break; - case 0x000a001f: - methodBody = new ConnectionTuneOkBody(in); - break; - case 0x000a0028: - methodBody = new ConnectionOpenBody(in); - break; - case 0x000a0029: - methodBody = new ConnectionOpenOkBody(in); - break; - case 0x000a002a: - methodBody = new ConnectionRedirectBody(in, _protocolVersion); - break; - case 0x000a0032: - if(_protocolVersion.equals(ProtocolVersion.v8_0)) - { - methodBody = new ConnectionRedirectBody(in, _protocolVersion); - } - else - { - methodBody = new ConnectionCloseBody(in, _protocolVersion); - } - break; - case 0x000a0033: - if(_protocolVersion.equals(ProtocolVersion.v8_0)) - { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF)); - } - else - { - methodBody = ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9; - } - break; - case 0x000a003c: - if(_protocolVersion.equals(ProtocolVersion.v8_0)) - { - methodBody = new ConnectionCloseBody(in, _protocolVersion); - } - else - { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF)); - } - break; - case 0x000a003d: - if(_protocolVersion.equals(ProtocolVersion.v8_0)) - { - methodBody = ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8; - } - else - { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF)); - } - break; - - // CHANNEL_CLASS: - - case 0x0014000a: - methodBody = new ChannelOpenBody(in); - break; - case 0x0014000b: - methodBody = ChannelOpenOkBody.getInstance(_protocolVersion, in); - break; - case 0x00140014: - methodBody = new ChannelFlowBody(in); - break; - case 0x00140015: - methodBody = new ChannelFlowOkBody(in); - break; - case 0x0014001e: - methodBody = new ChannelAlertBody(in); - break; - case 0x00140028: - methodBody = new ChannelCloseBody(in); - break; - case 0x00140029: - methodBody = new ChannelCloseOkBody(in); - break; - // ACCESS_CLASS: - - case 0x001e000a: - methodBody = new AccessRequestBody(in); - break; - case 0x001e000b: - methodBody = new AccessRequestOkBody(in); - break; - - // EXCHANGE_CLASS: - - case 0x0028000a: - methodBody = new ExchangeDeclareBody(in); - break; - case 0x0028000b: - methodBody = new ExchangeDeclareOkBody(in); - break; - case 0x00280014: - methodBody = new ExchangeDeleteBody(in); - break; - case 0x00280015: - methodBody = new ExchangeDeleteOkBody(in); - break; - case 0x00280016: - methodBody = new ExchangeBoundBody(in); - break; - case 0x00280017: - methodBody = new ExchangeBoundOkBody(in); - break; - - - // QUEUE_CLASS: - - case 0x0032000a: - methodBody = new QueueDeclareBody(in); - break; - case 0x0032000b: - methodBody = new QueueDeclareOkBody(in); - break; - case 0x00320014: - methodBody = new QueueBindBody(in); - break; - case 0x00320015: - methodBody = new QueueBindOkBody(in); - break; - case 0x0032001e: - methodBody = new QueuePurgeBody(in); - break; - case 0x0032001f: - methodBody = new QueuePurgeOkBody(in); - break; - case 0x00320028: - methodBody = new QueueDeleteBody(in); - break; - case 0x00320029: - methodBody = new QueueDeleteOkBody(in); - break; - case 0x00320032: - methodBody = new QueueUnbindBody(in); - break; - case 0x00320033: - methodBody = new QueueUnbindOkBody(in); - break; - - - // BASIC_CLASS: - - case 0x003c000a: - methodBody = new BasicQosBody(in); - break; - case 0x003c000b: - methodBody = new BasicQosOkBody(in); - break; - case 0x003c0014: - methodBody = new BasicConsumeBody(in); - break; - case 0x003c0015: - methodBody = new BasicConsumeOkBody(in); - break; - case 0x003c001e: - methodBody = new BasicCancelBody(in); - break; - case 0x003c001f: - methodBody = new BasicCancelOkBody(in); - break; - case 0x003c0028: - methodBody = new BasicPublishBody(in); - break; - case 0x003c0032: - methodBody = new BasicReturnBody(in); - break; - case 0x003c003c: - methodBody = new BasicDeliverBody(in); - break; - case 0x003c0046: - methodBody = new BasicGetBody(in); - break; - case 0x003c0047: - methodBody = new BasicGetOkBody(in); - break; - case 0x003c0048: - methodBody = new BasicGetEmptyBody(in); - break; - case 0x003c0050: - methodBody = new BasicAckBody(in); - break; - case 0x003c005a: - methodBody = new BasicRejectBody(in); - break; - case 0x003c0064: - methodBody = new BasicRecoverBody(in); - break; - case 0x003c0065: - methodBody = new BasicRecoverSyncOkBody(_protocolVersion); - break; - case 0x003c0066: - methodBody = new BasicRecoverSyncBody(in, _protocolVersion); - break; - case 0x003c006e: - methodBody = new BasicRecoverSyncBody(in, _protocolVersion); - break; - case 0x003c006f: - methodBody = new BasicRecoverSyncOkBody(_protocolVersion); - break; - - // TX_CLASS: - - case 0x005a000a: - methodBody = TxSelectBody.INSTANCE; - break; - case 0x005a000b: - methodBody = TxSelectOkBody.INSTANCE; - break; - case 0x005a0014: - methodBody = TxCommitBody.INSTANCE; - break; - case 0x005a0015: - methodBody = TxCommitOkBody.INSTANCE; - break; - case 0x005a001e: - methodBody = TxRollbackBody.INSTANCE; - break; - case 0x005a001f: - methodBody = TxRollbackOkBody.INSTANCE; - break; - - default: - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF)); - - } - return methodBody; - } - - private AMQFrameDecodingException newUnknownMethodException(final int classId, final int methodId) - { - return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID, - "Method " + methodId + " unknown in AMQP version " + _protocolVersion - + " (while trying to decode class " + classId + " method " + methodId + "."); - } - - private MethodRegistry(ProtocolVersion pv) - { - _registries.put(pv, this); + private final FrameCreatingMethodProcessor _methodProcessor; + private ProtocolVersion _protocolVersion; + + + public MethodRegistry(ProtocolVersion pv) + { _protocolVersion = pv; + _methodProcessor = new FrameCreatingMethodProcessor(this); } - public static MethodRegistry getMethodRegistry(ProtocolVersion pv) + public void setProtocolVersion(final ProtocolVersion protocolVersion) { - return _registries.get(pv); + _protocolVersion = protocolVersion; } - public final AccessRequestBody createAccessRequestBody(final AMQShortString realm, final boolean exclusive, final boolean passive, @@ -502,7 +223,7 @@ public final class MethodRegistry public final ChannelOpenBody createChannelOpenBody(final AMQShortString outOfBand) { - return new ChannelOpenBody(outOfBand); + return new ChannelOpenBody(); } public final ChannelOpenOkBody createChannelOpenOkBody(byte[] channelId) @@ -540,7 +261,7 @@ public final class MethodRegistry public final ChannelCloseOkBody createChannelCloseOkBody() { - return new ChannelCloseOkBody(); + return ChannelCloseOkBody.INSTANCE; } @@ -829,4 +550,15 @@ public final class MethodRegistry return TxRollbackOkBody.INSTANCE; } + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } + + public FrameCreatingMethodProcessor getMethodProcessor() + { + return _methodProcessor; + } + + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistrySource.java b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistrySource.java deleted file mode 100644 index 4d2eda68b2..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistrySource.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -public interface MethodRegistrySource -{ - public MethodRegistry getMethodRegistry(); -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java index ddcaf6290e..42e1c44d7d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java @@ -50,12 +50,12 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData // Constructor public QueueBindBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _queue = readAMQShortString( buffer ); - _exchange = readAMQShortString( buffer ); - _routingKey = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); - _arguments = readFieldTable( buffer ); + _ticket = buffer.readUnsignedShort(); + _queue = buffer.readAMQShortString(); + _exchange = buffer.readAMQShortString(); + _routingKey = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); + _arguments = EncodingUtils.readFieldTable(buffer); } public QueueBindBody( @@ -165,4 +165,17 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString queue = buffer.readAMQShortString(); + AMQShortString exchange = buffer.readAMQShortString(); + AMQShortString bindingKey = buffer.readAMQShortString(); + boolean nowait = (buffer.readByte() & 0x01) == 0x01; + FieldTable arguments = EncodingUtils.readFieldTable(buffer); + return dispatcher.queueBind(channelId, queue, exchange, bindingKey, nowait, arguments); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java index c7322aa71c..3a8d2f41a5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java @@ -48,10 +48,10 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD // Constructor public QueueDeclareBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _queue = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); - _arguments = readFieldTable( buffer ); + _ticket = buffer.readUnsignedShort(); + _queue = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); + _arguments = EncodingUtils.readFieldTable(buffer); } public QueueDeclareBody( @@ -191,4 +191,21 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString queue = buffer.readAMQShortString(); + byte bitfield = buffer.readByte(); + + boolean passive = (bitfield & 0x01 ) == 0x01; + boolean durable = (bitfield & 0x02 ) == 0x02; + boolean exclusive = (bitfield & 0x04 ) == 0x04; + boolean autoDelete = (bitfield & 0x08 ) == 0x08; + boolean nowait = (bitfield & 0x010 ) == 0x010; + FieldTable arguments = EncodingUtils.readFieldTable(buffer); + return dispatcher.queueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java index 7ee65e377a..47deb9cd6d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java @@ -47,9 +47,9 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM // Constructor public QueueDeclareOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _queue = readAMQShortString( buffer ); - _messageCount = readUnsignedInteger( buffer ); - _consumerCount = readUnsignedInteger( buffer ); + _queue = buffer.readAMQShortString(); + _messageCount = EncodingUtils.readUnsignedInteger(buffer); + _consumerCount = EncodingUtils.readUnsignedInteger(buffer); } public QueueDeclareOkBody( @@ -120,4 +120,13 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + AMQShortString queue = buffer.readAMQShortString(); + long messageCount = EncodingUtils.readUnsignedInteger(buffer); + long consumerCount = EncodingUtils.readUnsignedInteger(buffer); + return dispatcher.queueDeclareOk(channelId, queue, messageCount, consumerCount); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java index 6e534fd556..fc9795f48b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java @@ -47,9 +47,9 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa // Constructor public QueueDeleteBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _queue = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); + _ticket = buffer.readUnsignedShort(); + _queue = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); } public QueueDeleteBody( @@ -151,4 +151,18 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString queue = buffer.readAMQShortString(); + byte bitfield = buffer.readByte(); + + boolean ifUnused = (bitfield & 0x01) == 0x01; + boolean ifEmpty = (bitfield & 0x02) == 0x02; + boolean nowait = (bitfield & 0x04) == 0x04; + return dispatcher.queueDelete(channelId, queue, ifUnused, ifEmpty, nowait); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java index cb58db5de6..b04f844084 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java @@ -45,7 +45,7 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ // Constructor public QueueDeleteOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _messageCount = readUnsignedInteger( buffer ); + _messageCount = EncodingUtils.readUnsignedInteger(buffer); } public QueueDeleteOkBody( @@ -95,4 +95,11 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + long messageCount = EncodingUtils.readUnsignedInteger(buffer); + return dispatcher.queueDeleteOk(channelId, messageCount); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java index 713b0b24ad..d2f41922cc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java @@ -47,9 +47,9 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat // Constructor public QueuePurgeBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _queue = readAMQShortString( buffer ); - _bitfield0 = readBitfield( buffer ); + _ticket = buffer.readUnsignedShort(); + _queue = buffer.readAMQShortString(); + _bitfield0 = buffer.readByte(); } public QueuePurgeBody( @@ -125,4 +125,14 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString queue = buffer.readAMQShortString(); + boolean nowait = (buffer.readByte() & 0x01) == 0x01; + return dispatcher.queuePurge(channelId, queue, nowait); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java index c2bc1caf14..da5ba766ae 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java @@ -45,7 +45,7 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD // Constructor public QueuePurgeOkBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _messageCount = readUnsignedInteger( buffer ); + _messageCount = EncodingUtils.readUnsignedInteger(buffer); } public QueuePurgeOkBody( @@ -95,4 +95,11 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException + { + long messageCount = EncodingUtils.readUnsignedInteger(buffer); + return dispatcher.queuePurgeOk(channelId, messageCount); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java index a5aba58f15..968cc02212 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java @@ -49,11 +49,11 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa // Constructor public QueueUnbindBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { - _ticket = readUnsignedShort( buffer ); - _queue = readAMQShortString( buffer ); - _exchange = readAMQShortString( buffer ); - _routingKey = readAMQShortString( buffer ); - _arguments = readFieldTable( buffer ); + _ticket = buffer.readUnsignedShort(); + _queue = buffer.readAMQShortString(); + _exchange = buffer.readAMQShortString(); + _routingKey = buffer.readAMQShortString(); + _arguments = EncodingUtils.readFieldTable(buffer); } public QueueUnbindBody( @@ -147,4 +147,16 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } + public static <T> T process(final int channelId, + final MarkableDataInput buffer, + final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException + { + + int ticket = buffer.readUnsignedShort(); + AMQShortString queue = buffer.readAMQShortString(); + AMQShortString exchange = buffer.readAMQShortString(); + AMQShortString routingKey = buffer.readAMQShortString(); + FieldTable arguments = EncodingUtils.readFieldTable(buffer); + return dispatcher.queueUnbind(channelId, queue, exchange, routingKey, arguments); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java index 27c49a0804..2e504d6fc7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java @@ -27,11 +27,11 @@ package org.apache.qpid.framing; -import org.apache.qpid.codec.MarkableDataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; public class QueueUnbindOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody { |