diff options
Diffstat (limited to 'java/common/src/main')
58 files changed, 1593 insertions, 1096 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index b7904303b5..9d98168687 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -30,14 +30,8 @@ import java.util.ArrayList; import java.util.List; import java.util.ListIterator; -import org.apache.qpid.framing.AMQDataBlockDecoder; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.AMQProtocolVersionException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ByteArrayDataInput; -import org.apache.qpid.framing.EncodingUtils; -import org.apache.qpid.framing.MethodProcessor; -import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQConstant; /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a @@ -51,12 +45,9 @@ import org.apache.qpid.framing.ProtocolInitiation; * TODO If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the * per-session overhead. */ -public class AMQDecoder +public abstract class AMQDecoder<T extends MethodProcessor> { - private final MethodProcessor _methodProcessor; - - /** Holds the 'normal' AMQP data decoder. */ - private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); + private final T _methodProcessor; /** Holds the protocol initiation decoder. */ private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder(); @@ -67,6 +58,8 @@ public class AMQDecoder private boolean _firstRead = true; + private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); + private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>(); /** @@ -75,7 +68,7 @@ public class AMQDecoder * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation. * @param methodProcessor method processor */ - public AMQDecoder(boolean expectProtocolInitiation, MethodProcessor methodProcessor) + protected AMQDecoder(boolean expectProtocolInitiation, T methodProcessor) { _expectProtocolInitiation = expectProtocolInitiation; _methodProcessor = methodProcessor; @@ -96,7 +89,12 @@ public class AMQDecoder public void setMaxFrameSize(final int frameMax) { - _dataBlockDecoder.setMaxFrameSize(frameMax); + _maxFrameSize = frameMax; + } + + public T getMethodProcessor() + { + return _methodProcessor; } private class RemainingByteArrayInputStream extends InputStream @@ -254,10 +252,10 @@ public class AMQDecoder { if(!_expectProtocolInitiation) { - enoughData = _dataBlockDecoder.decodable(msg); + enoughData = decodable(msg); if (enoughData) { - _dataBlockDecoder.processInput(_methodProcessor, msg); + processInput(msg); } } else @@ -303,4 +301,105 @@ public class AMQDecoder } } } + + private boolean decodable(final MarkableDataInput in) throws AMQFrameDecodingException, IOException + { + final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1); + // type, channel, body length and end byte + if (remainingAfterAttributes < 0) + { + return false; + } + + in.mark(8); + in.skip(1 + 2); + + + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.readInt() & 0xffffffffL; + if (bodySize > _maxFrameSize) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Incoming frame size of " + + bodySize + + " is larger than negotiated maximum of " + + _maxFrameSize); + } + in.reset(); + + return (remainingAfterAttributes >= bodySize); + + } + + private void processInput(final MarkableDataInput in) + throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + { + final byte type = in.readByte(); + + final int channel = in.readUnsignedShort(); + final long bodySize = EncodingUtils.readUnsignedInteger(in); + + // bodySize can be zero + if ((channel < 0) || (bodySize < 0)) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); + } + + processFrame(channel, type, bodySize, in); + + byte marker = in.readByte(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type); + } + + } + + protected void processFrame(final int channel, final byte type, final long bodySize, final MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + switch (type) + { + case 1: + processMethod(channel, in); + break; + case 2: + ContentHeaderBody.process(in, _methodProcessor.getChannelMethodProcessor(channel), bodySize); + break; + case 3: + ContentBody.process(in, _methodProcessor.getChannelMethodProcessor(channel), bodySize); + break; + case 8: + HeartbeatBody.process(channel, in, _methodProcessor, bodySize); + break; + default: + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); + } + } + + + abstract void processMethod(int channelId, + MarkableDataInput in) + throws AMQFrameDecodingException, IOException; + + AMQFrameDecodingException newUnknownMethodException(final int classId, + final int methodId, + ProtocolVersion protocolVersion) + { + return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID, + "Method " + + methodId + + " unknown in AMQP version " + + protocolVersion + + " (while trying to decode class " + + classId + + " method " + + methodId + + "."); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java new file mode 100644 index 0000000000..5048193cac --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.codec; + +import java.io.IOException; + +import org.apache.qpid.framing.*; + +public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>> +{ + + /** + * Creates a new AMQP decoder. + * + * @param methodProcessor method processor + */ + public ClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor) + { + super(false, methodProcessor); + } + + + void processMethod(int channelId, + MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor = getMethodProcessor(); + ClientChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId); + final int classAndMethod = in.readInt(); + int classId = classAndMethod >> 16; + int methodId = classAndMethod & 0xFFFF; + methodProcessor.setCurrentMethod(classId, methodId); + try + { + switch (classAndMethod) + { + //CONNECTION_CLASS: + case 0x000a000a: + ConnectionStartBody.process(in, methodProcessor); + break; + case 0x000a0014: + ConnectionSecureBody.process(in, methodProcessor); + break; + case 0x000a001e: + ConnectionTuneBody.process(in, methodProcessor); + break; + case 0x000a0029: + ConnectionOpenOkBody.process(in, methodProcessor); + break; + case 0x000a002a: + ConnectionRedirectBody.process(in, methodProcessor); + break; + case 0x000a0032: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionRedirectBody.process(in, methodProcessor); + } + else + { + ConnectionCloseBody.process(in, methodProcessor); + } + break; + case 0x000a0033: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + else + { + methodProcessor.receiveConnectionCloseOk(); + } + break; + case 0x000a003c: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionCloseBody.process(in, methodProcessor); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + case 0x000a003d: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + methodProcessor.receiveConnectionCloseOk(); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + + // CHANNEL_CLASS: + + case 0x0014000b: + ChannelOpenOkBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor); + break; + case 0x00140014: + ChannelFlowBody.process(in, channelMethodProcessor); + break; + case 0x00140015: + ChannelFlowOkBody.process(in, channelMethodProcessor); + break; + case 0x0014001e: + ChannelAlertBody.process(in, channelMethodProcessor); + break; + case 0x00140028: + ChannelCloseBody.process(in, channelMethodProcessor); + break; + case 0x00140029: + channelMethodProcessor.receiveChannelCloseOk(); + break; + + // ACCESS_CLASS: + + case 0x001e000b: + AccessRequestOkBody.process(in, channelMethodProcessor); + break; + + // EXCHANGE_CLASS: + + case 0x0028000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveExchangeDeclareOk(); + } + break; + case 0x00280015: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveExchangeDeleteOk(); + } + break; + case 0x00280017: + ExchangeBoundOkBody.process(in, channelMethodProcessor); + break; + + + // QUEUE_CLASS: + + case 0x0032000b: + QueueDeclareOkBody.process(in, channelMethodProcessor); + break; + case 0x00320015: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveQueueBindOk(); + } + break; + case 0x0032001f: + QueuePurgeOkBody.process(in, channelMethodProcessor); + break; + case 0x00320029: + QueueDeleteOkBody.process(in, channelMethodProcessor); + break; + case 0x00320033: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveQueueUnbindOk(); + } + break; + + + // BASIC_CLASS: + + case 0x003c000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveBasicQosOk(); + } + break; + case 0x003c0015: + BasicConsumeOkBody.process(in, channelMethodProcessor); + break; + case 0x003c001f: + BasicCancelOkBody.process(in, channelMethodProcessor); + break; + case 0x003c0032: + BasicReturnBody.process(in, channelMethodProcessor); + break; + case 0x003c003c: + BasicDeliverBody.process(in, channelMethodProcessor); + break; + case 0x003c0047: + BasicGetOkBody.process(in, channelMethodProcessor); + break; + case 0x003c0048: + BasicGetEmptyBody.process(in, channelMethodProcessor); + break; + case 0x003c0065: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveBasicRecoverSyncOk(); + } + break; + case 0x003c006f: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveBasicRecoverSyncOk(); + } + break; + + // TX_CLASS: + + case 0x005a000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxSelectOk(); + } + break; + case 0x005a0015: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxCommitOk(); + } + break; + case 0x005a001f: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxRollbackOk(); + } + break; + + default: + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + + } + } + finally + { + methodProcessor.setCurrentMethod(0, 0); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java new file mode 100644 index 0000000000..3b138ba278 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java @@ -0,0 +1,234 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.codec; + +import java.io.IOException; + +import org.apache.qpid.framing.*; + +public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends ServerChannelMethodProcessor>> +{ + + /** + * Creates a new AMQP decoder. + * + * @param methodProcessor method processor + */ + public ServerDecoder(final ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor) + { + super(true, methodProcessor); + } + + void processMethod(int channelId, + MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor = getMethodProcessor(); + ServerChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId); + final int classAndMethod = in.readInt(); + int classId = classAndMethod >> 16; + int methodId = classAndMethod & 0xFFFF; + methodProcessor.setCurrentMethod(classId, methodId); + try + { + switch (classAndMethod) + { + //CONNECTION_CLASS: + case 0x000a000b: + ConnectionStartOkBody.process(in, methodProcessor); + break; + case 0x000a0015: + ConnectionSecureOkBody.process(in, methodProcessor); + break; + case 0x000a001f: + ConnectionTuneOkBody.process(in, methodProcessor); + break; + case 0x000a0028: + ConnectionOpenBody.process(in, methodProcessor); + break; + case 0x000a0032: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + else + { + ConnectionCloseBody.process(in, methodProcessor); + } + break; + case 0x000a0033: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + else + { + methodProcessor.receiveConnectionCloseOk(); + } + break; + case 0x000a003c: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionCloseBody.process(in, methodProcessor); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + case 0x000a003d: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + methodProcessor.receiveConnectionCloseOk(); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + + // CHANNEL_CLASS: + + case 0x0014000a: + ChannelOpenBody.process(channelId, in, methodProcessor); + break; + case 0x00140014: + ChannelFlowBody.process(in, channelMethodProcessor); + break; + case 0x00140015: + ChannelFlowOkBody.process(in, channelMethodProcessor); + break; + case 0x00140028: + ChannelCloseBody.process(in, channelMethodProcessor); + break; + case 0x00140029: + channelMethodProcessor.receiveChannelCloseOk(); + break; + + // ACCESS_CLASS: + + case 0x001e000a: + AccessRequestBody.process(in, channelMethodProcessor); + break; + + // EXCHANGE_CLASS: + + case 0x0028000a: + ExchangeDeclareBody.process(in, channelMethodProcessor); + break; + case 0x00280014: + ExchangeDeleteBody.process(in, channelMethodProcessor); + break; + case 0x00280016: + ExchangeBoundBody.process(in, channelMethodProcessor); + break; + + + // QUEUE_CLASS: + + case 0x0032000a: + QueueDeclareBody.process(in, channelMethodProcessor); + break; + case 0x00320014: + QueueBindBody.process(in, channelMethodProcessor); + break; + case 0x0032001e: + QueuePurgeBody.process(in, channelMethodProcessor); + break; + case 0x00320028: + QueueDeleteBody.process(in, channelMethodProcessor); + break; + case 0x00320032: + QueueUnbindBody.process(in, channelMethodProcessor); + break; + + + // BASIC_CLASS: + + case 0x003c000a: + BasicQosBody.process(in, channelMethodProcessor); + break; + case 0x003c0014: + BasicConsumeBody.process(in, channelMethodProcessor); + break; + case 0x003c001e: + BasicCancelBody.process(in, channelMethodProcessor); + break; + case 0x003c0028: + BasicPublishBody.process(in, channelMethodProcessor); + break; + case 0x003c0046: + BasicGetBody.process(in, channelMethodProcessor); + break; + case 0x003c0050: + BasicAckBody.process(in, channelMethodProcessor); + break; + case 0x003c005a: + BasicRejectBody.process(in, channelMethodProcessor); + break; + case 0x003c0064: + BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor); + break; + case 0x003c0066: + BasicRecoverSyncBody.process(in, channelMethodProcessor); + break; + case 0x003c006e: + BasicRecoverSyncBody.process(in, channelMethodProcessor); + break; + + // TX_CLASS: + + case 0x005a000a: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxSelect(); + } + break; + case 0x005a0014: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxCommit(); + } + break; + case 0x005a001e: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxRollback(); + } + break; + + default: + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + + } + } + finally + { + methodProcessor.setCurrentMethod(0, 0); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java deleted file mode 100644 index 49a88b6bc1..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.codec.MarkableDataInput; -import org.apache.qpid.protocol.AMQConstant; - -public class AMQDataBlockDecoder -{ - - private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); - private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); - - public AMQDataBlockDecoder() - { - } - - public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException - { - final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1); - // type, channel, body length and end byte - if (remainingAfterAttributes < 0) - { - return false; - } - - in.mark(8); - in.skip(1 + 2); - - - // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() - final long bodySize = in.readInt() & 0xffffffffL; - if (bodySize > _maxFrameSize) - { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, - "Incoming frame size of " - + bodySize - + " is larger than negotiated maximum of " - + _maxFrameSize); - } - in.reset(); - - return (remainingAfterAttributes >= bodySize); - - } - - public void processInput(MethodProcessor processor, - MarkableDataInput in) - throws AMQFrameDecodingException, AMQProtocolVersionException, IOException - { - final byte type = in.readByte(); - - final int channel = in.readUnsignedShort(); - final long bodySize = EncodingUtils.readUnsignedInteger(in); - - // bodySize can be zero - if ((channel < 0) || (bodySize < 0)) - { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, - "Undecodable frame: type = " + type + " channel = " + channel - + " bodySize = " + bodySize); - } - - switch (type) - { - case 1: - processMethod(channel, in, processor); - break; - case 2: - ContentHeaderBody.process(channel, in, processor, bodySize); - break; - case 3: - ContentBody.process(channel, in, processor, bodySize); - break; - case 8: - HeartbeatBody.process(channel, in, processor, bodySize); - break; - default: - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); - } - - byte marker = in.readByte(); - if ((marker & 0xFF) != 0xCE) - { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, - "End of frame marker not found. Read " + marker + " length=" + bodySize - + " type=" + type); - } - - } - - public void setMaxFrameSize(final int maxFrameSize) - { - _maxFrameSize = maxFrameSize; - } - - private void processMethod(int channelId, - MarkableDataInput in, - MethodProcessor dispatcher) - throws AMQFrameDecodingException, IOException - { - final int classAndMethod = in.readInt(); - int classId = classAndMethod >> 16; - int methodId = classAndMethod & 0xFFFF; - dispatcher.setCurrentMethod(classId, methodId); - try - { - switch (classAndMethod) - { - //CONNECTION_CLASS: - case 0x000a000a: - ConnectionStartBody.process(in, dispatcher); - break; - case 0x000a000b: - ConnectionStartOkBody.process(in, dispatcher); - break; - case 0x000a0014: - ConnectionSecureBody.process(in, dispatcher); - break; - case 0x000a0015: - ConnectionSecureOkBody.process(in, dispatcher); - break; - case 0x000a001e: - ConnectionTuneBody.process(in, dispatcher); - break; - case 0x000a001f: - ConnectionTuneOkBody.process(in, dispatcher); - break; - case 0x000a0028: - ConnectionOpenBody.process(in, dispatcher); - break; - case 0x000a0029: - ConnectionOpenOkBody.process(in, dispatcher); - break; - case 0x000a002a: - ConnectionRedirectBody.process(in, dispatcher); - break; - case 0x000a0032: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - ConnectionRedirectBody.process(in, dispatcher); - } - else - { - ConnectionCloseBody.process(in, dispatcher); - } - break; - case 0x000a0033: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - throw newUnknownMethodException(classId, methodId, - dispatcher.getProtocolVersion()); - } - else - { - dispatcher.receiveConnectionCloseOk(); - } - break; - case 0x000a003c: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - ConnectionCloseBody.process(in, dispatcher); - } - else - { - throw newUnknownMethodException(classId, methodId, - dispatcher.getProtocolVersion()); - } - break; - case 0x000a003d: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - dispatcher.receiveConnectionCloseOk(); - } - else - { - throw newUnknownMethodException(classId, methodId, - dispatcher.getProtocolVersion()); - } - break; - - // CHANNEL_CLASS: - - case 0x0014000a: - ChannelOpenBody.process(channelId, in, dispatcher); - break; - case 0x0014000b: - ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); - break; - case 0x00140014: - ChannelFlowBody.process(channelId, in, dispatcher); - break; - case 0x00140015: - ChannelFlowOkBody.process(channelId, in, dispatcher); - break; - case 0x0014001e: - ChannelAlertBody.process(channelId, in, dispatcher); - break; - case 0x00140028: - ChannelCloseBody.process(channelId, in, dispatcher); - break; - case 0x00140029: - dispatcher.receiveChannelCloseOk(channelId); - break; - - // ACCESS_CLASS: - - case 0x001e000a: - AccessRequestBody.process(channelId, in, dispatcher); - break; - case 0x001e000b: - AccessRequestOkBody.process(channelId, in, dispatcher); - break; - - // EXCHANGE_CLASS: - - case 0x0028000a: - ExchangeDeclareBody.process(channelId, in, dispatcher); - break; - case 0x0028000b: - dispatcher.receiveExchangeDeclareOk(channelId); - break; - case 0x00280014: - ExchangeDeleteBody.process(channelId, in, dispatcher); - break; - case 0x00280015: - dispatcher.receiveExchangeDeleteOk(channelId); - break; - case 0x00280016: - ExchangeBoundBody.process(channelId, in, dispatcher); - break; - case 0x00280017: - ExchangeBoundOkBody.process(channelId, in, dispatcher); - break; - - - // QUEUE_CLASS: - - case 0x0032000a: - QueueDeclareBody.process(channelId, in, dispatcher); - break; - case 0x0032000b: - QueueDeclareOkBody.process(channelId, in, dispatcher); - break; - case 0x00320014: - QueueBindBody.process(channelId, in, dispatcher); - break; - case 0x00320015: - dispatcher.receiveQueueBindOk(channelId); - break; - case 0x0032001e: - QueuePurgeBody.process(channelId, in, dispatcher); - break; - case 0x0032001f: - QueuePurgeOkBody.process(channelId, in, dispatcher); - break; - case 0x00320028: - QueueDeleteBody.process(channelId, in, dispatcher); - break; - case 0x00320029: - QueueDeleteOkBody.process(channelId, in, dispatcher); - break; - case 0x00320032: - QueueUnbindBody.process(channelId, in, dispatcher); - break; - case 0x00320033: - dispatcher.receiveQueueUnbindOk(channelId); - break; - - - // BASIC_CLASS: - - case 0x003c000a: - BasicQosBody.process(channelId, in, dispatcher); - break; - case 0x003c000b: - dispatcher.receiveBasicQosOk(channelId); - break; - case 0x003c0014: - BasicConsumeBody.process(channelId, in, dispatcher); - break; - case 0x003c0015: - BasicConsumeOkBody.process(channelId, in, dispatcher); - break; - case 0x003c001e: - BasicCancelBody.process(channelId, in, dispatcher); - break; - case 0x003c001f: - BasicCancelOkBody.process(channelId, in, dispatcher); - break; - case 0x003c0028: - BasicPublishBody.process(channelId, in, dispatcher); - break; - case 0x003c0032: - BasicReturnBody.process(channelId, in, dispatcher); - break; - case 0x003c003c: - BasicDeliverBody.process(channelId, in, dispatcher); - break; - case 0x003c0046: - BasicGetBody.process(channelId, in, dispatcher); - break; - case 0x003c0047: - BasicGetOkBody.process(channelId, in, dispatcher); - break; - case 0x003c0048: - BasicGetEmptyBody.process(channelId, in, dispatcher); - break; - case 0x003c0050: - BasicAckBody.process(channelId, in, dispatcher); - break; - case 0x003c005a: - BasicRejectBody.process(channelId, in, dispatcher); - break; - case 0x003c0064: - BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); - break; - case 0x003c0065: - dispatcher.receiveBasicRecoverSyncOk(channelId); - break; - case 0x003c0066: - BasicRecoverSyncBody.process(channelId, in, dispatcher); - break; - case 0x003c006e: - BasicRecoverSyncBody.process(channelId, in, dispatcher); - break; - case 0x003c006f: - dispatcher.receiveBasicRecoverSyncOk(channelId); - break; - - // TX_CLASS: - - case 0x005a000a: - dispatcher.receiveTxSelect(channelId); - break; - case 0x005a000b: - dispatcher.receiveTxSelectOk(channelId); - break; - case 0x005a0014: - dispatcher.receiveTxCommit(channelId); - break; - case 0x005a0015: - dispatcher.receiveTxCommitOk(channelId); - break; - case 0x005a001e: - dispatcher.receiveTxRollback(channelId); - break; - case 0x005a001f: - dispatcher.receiveTxRollbackOk(channelId); - break; - - default: - throw newUnknownMethodException(classId, methodId, - dispatcher.getProtocolVersion()); - - } - } - finally - { - dispatcher.setCurrentMethod(0,0); - } - } - - private AMQFrameDecodingException newUnknownMethodException(final int classId, - final int methodId, - ProtocolVersion protocolVersion) - { - return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID, - "Method " - + methodId - + " unknown in AMQP version " - + protocolVersion - + " (while trying to decode class " - + classId - + " method " - + methodId - + "."); - } - - -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java index ce2a5a1317..8dec50c400 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java @@ -165,9 +165,8 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { AMQShortString realm = buffer.readAMQShortString(); byte bitfield = buffer.readByte(); @@ -176,6 +175,9 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ boolean active = (bitfield & 0x04) == 0x4 ; boolean write = (bitfield & 0x08) == 0x8 ; boolean read = (bitfield & 0x10) == 0x10 ; - dispatcher.receiveAccessRequest(channelId, realm, exclusive, passive, active, write, read); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveAccessRequest(realm, exclusive, passive, active, write, read); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java index 10be4d45c8..7ed0b3602b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java @@ -95,10 +95,14 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); - dispatcher.receiveAccessRequestOk(channelId, ticket); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveAccessRequestOk(ticket); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java index 70e3f10148..68782231fe 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java @@ -112,13 +112,15 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean multiple = (buffer.readByte() & 0x01) != 0; - dispatcher.receiveBasicAck(channelId, deliveryTag, multiple); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicAck(deliveryTag, multiple); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java index 6f74b3870a..c9a870e2a5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java @@ -113,13 +113,15 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); boolean noWait = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveBasicCancel(channelId, consumerTag, noWait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicCancel(consumerTag, noWait); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java index 0e9bc52d66..8d16aa44ec 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java @@ -96,10 +96,14 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = in.readAMQShortString(); - dispatcher.receiveBasicCancelOk(channelId, consumerTag); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicCancelOk(consumerTag); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java index 94396418fe..502fa07e78 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java @@ -191,7 +191,8 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { @@ -205,6 +206,9 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD boolean exclusive = (bitfield & 0x04) == 0x04; boolean nowait = (bitfield & 0x08) == 0x08; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveBasicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicConsume(queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java index d42c722fdf..d3df7f222a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java @@ -96,10 +96,14 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); - dispatcher.receiveBasicConsumeOk(channelId, consumerTag); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicConsumeOk(consumerTag); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java index afa38d1852..f61ee2d55b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java @@ -152,9 +152,8 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); @@ -162,6 +161,9 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD boolean redelivered = (buffer.readByte() & 0x01) != 0; AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); - dispatcher.receiveBasicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java index 93429b97d8..68a6f2980b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java @@ -125,13 +125,17 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); AMQShortString queue = buffer.readAMQShortString(); boolean noAck = (buffer.readByte() & 0x01) != 0; - dispatcher.receiveBasicGet(channelId, queue, noAck); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicGet(queue, noAck); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java index a42df6bcc7..f37fb632db 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java @@ -96,11 +96,13 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString clusterId = buffer.readAMQShortString(); - dispatcher.receiveBasicGetEmpty(channelId); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicGetEmpty(); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java index b8af656a35..37e9bdae5a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java @@ -151,15 +151,17 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean redelivered = (buffer.readByte() & 0x01) != 0; AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); long messageCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveBasicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicGetOk(deliveryTag, redelivered, exchange, routingKey, messageCount); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java index 910942c2f1..8e5d71a804 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java @@ -151,9 +151,8 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); @@ -163,6 +162,9 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD boolean mandatory = (bitfield & 0x01) != 0; boolean immediate = (bitfield & 0x02) != 0; - dispatcher.receiveBasicPublish(channelId, exchange, routingKey, mandatory, immediate); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicPublish(exchange, routingKey, mandatory, immediate); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java index fb6b6956c6..6b7e90f41f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java @@ -124,14 +124,16 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { long prefetchSize = EncodingUtils.readUnsignedInteger(buffer); int prefetchCount = buffer.readUnsignedShort(); boolean global = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveBasicQos(channelId, prefetchSize, prefetchCount, global); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicQos(prefetchSize, prefetchCount, global); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java index 2519f25fbe..e5490c4827 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java @@ -100,14 +100,16 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput in, - final ProtocolVersion protocolVersion, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput in, + final ProtocolVersion protocolVersion, + final ServerChannelMethodProcessor dispatcher) throws IOException { boolean requeue = (in.readByte() & 0x01) == 0x01; boolean sync = (ProtocolVersion.v8_0.equals(protocolVersion)); - dispatcher.receiveBasicRecover(channelId, requeue, sync); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicRecover(requeue, sync); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java index 16c9798977..f82ee78862 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java @@ -103,11 +103,13 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput in, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput in, + final ServerChannelMethodProcessor dispatcher) throws IOException { boolean requeue = (in.readByte() & 0x01) == 0x01; - dispatcher.receiveBasicRecover(channelId, requeue, true); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicRecover(requeue, true); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java index 8e1ebf4013..8c8757f1d2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java @@ -112,13 +112,15 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean requeue = (buffer.readByte() & 0x01) != 0; - dispatcher.receiveBasicReject(channelId, deliveryTag, requeue); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicReject(deliveryTag, requeue); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java index cff9914705..afdb343c9f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java @@ -134,15 +134,17 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); - dispatcher.receiveBasicReturn(channelId, replyCode, replyText, exchange, routingKey); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicReturn(replyCode, replyText, exchange, routingKey); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java index 11dcffc175..289cf2cc10 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java @@ -121,13 +121,17 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); FieldTable details = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveChannelAlert(channelId, replyCode, replyText, details); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelAlert(replyCode, replyText, details); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java index a4f54fbe7d..a3b92a1fad 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java @@ -132,15 +132,17 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); int classId = buffer.readUnsignedShort(); int methodId = buffer.readUnsignedShort(); - dispatcher.receiveChannelClose(channelId, replyCode, replyText, classId, methodId); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelClose(replyCode, replyText, classId, methodId); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java index c975744d9f..1c3cc47d4e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java @@ -92,11 +92,13 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor dispatcher) throws IOException { boolean active = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveChannelFlow(channelId, active); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelFlow(active); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java index a62c6155f8..9d4a2b09a1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java @@ -93,10 +93,14 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor dispatcher) throws IOException { boolean active = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveChannelFlowOk(channelId, active); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelFlowOk(active); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java new file mode 100644 index 0000000000..84cd1e13c2 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ChannelMethodProcessor +{ + void receiveChannelFlow(boolean active); + + void receiveChannelFlowOk(boolean active); + + void receiveChannelClose(int replyCode, AMQShortString replyText, int classId, int methodId); + + void receiveChannelCloseOk(); + + void receiveMessageContent(byte[] data); + + void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize); + + boolean ignoreAllButCloseOk(); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java index 9da45d3d70..af583f5fda 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java @@ -84,9 +84,12 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final ServerMethodProcessor dispatcher) throws IOException { buffer.readAMQShortString(); - dispatcher.receiveChannelOpen(channelId); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelOpen(channelId); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java index 775a08fbd4..e3b4f38a8c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java @@ -96,16 +96,18 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ return "[ChannelOpenOkBody]"; } - public static void process(final int channelId, - final MarkableDataInput in, - final ProtocolVersion protocolVersion, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput in, + final ProtocolVersion protocolVersion, + final ClientChannelMethodProcessor dispatcher) throws IOException { if(!ProtocolVersion.v8_0.equals(protocolVersion)) { EncodingUtils.readBytes(in); } - dispatcher.receiveChannelOpenOk(channelId); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelOpenOk(); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java new file mode 100644 index 0000000000..bef143e39b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ClientChannelMethodProcessor extends ChannelMethodProcessor +{ + void receiveChannelOpenOk(); + + void receiveChannelAlert(int replyCode, final AMQShortString replyText, FieldTable details); + + void receiveAccessRequestOk(int ticket); + + void receiveExchangeDeclareOk(); + + void receiveExchangeDeleteOk(); + + void receiveExchangeBoundOk(int replyCode, AMQShortString replyText); + + void receiveQueueBindOk(); + + void receiveQueueUnbindOk(); + + void receiveQueueDeclareOk(final AMQShortString queue, long messageCount, long consumerCount); + + void receiveQueuePurgeOk(long messageCount); + + void receiveQueueDeleteOk(long messageCount); + + void receiveBasicRecoverSyncOk(); + + void receiveBasicQosOk(); + + void receiveBasicConsumeOk(AMQShortString consumerTag); + + void receiveBasicCancelOk(AMQShortString consumerTag); + + void receiveBasicReturn(int replyCode, + AMQShortString replyText, + AMQShortString exchange, + AMQShortString routingKey); + + void receiveBasicDeliver(AMQShortString consumerTag, + long deliveryTag, + boolean redelivered, + AMQShortString exchange, AMQShortString routingKey); + + void receiveBasicGetOk(long deliveryTag, + boolean redelivered, + AMQShortString exchange, + AMQShortString routingKey, long messageCount); + + void receiveBasicGetEmpty(); + + void receiveTxSelectOk(); + + void receiveTxCommitOk(); + + void receiveTxRollbackOk(); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java new file mode 100644 index 0000000000..0b599ee40a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ClientMethodProcessor<T extends ClientChannelMethodProcessor> extends MethodProcessor<T> +{ + void receiveConnectionStart(short versionMajor, + short versionMinor, + FieldTable serverProperties, + byte[] mechanisms, + byte[] locales); + + void receiveConnectionSecure(byte[] challenge); + + void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts); + + void receiveConnectionTune(int channelMax, long frameMax, int heartbeat); + + void receiveConnectionOpenOk(AMQShortString knownHosts); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java index 0e685deb7c..7fb815ae40 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java @@ -121,12 +121,15 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ServerMethodProcessor dispatcher) throws IOException { AMQShortString virtualHost = buffer.readAMQShortString(); AMQShortString capabilities = buffer.readAMQShortString(); boolean insist = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java index 6d1e80c624..95c48873f3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java @@ -96,10 +96,13 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ClientMethodProcessor dispatcher) throws IOException { AMQShortString knownHosts = buffer.readAMQShortString(); - dispatcher.receiveConnectionOpenOk(knownHosts); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionOpenOk(knownHosts); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java index a9b9a43b1a..491cc25125 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java @@ -108,10 +108,13 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ClientMethodProcessor dispatcher) throws IOException { AMQShortString host = buffer.readAMQShortString(); AMQShortString knownHosts = buffer.readAMQShortString(); - dispatcher.receiveConnectionRedirect(host, knownHosts); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionRedirect(host, knownHosts); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java index 1f7f2b0221..e10af3b4c1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java @@ -96,11 +96,14 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, final ClientMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { byte[] challenge = EncodingUtils.readBytes(in); - dispatcher.receiveConnectionSecure(challenge); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionSecure(challenge); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java index 9a4668a9c7..4c4a249bb6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java @@ -96,9 +96,12 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab return buf.toString(); } - public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput in, final ServerMethodProcessor dispatcher) throws IOException { byte[] response = EncodingUtils.readBytes(in); - dispatcher.receiveConnectionSecureOk(response); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionSecureOk(response); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java index 4f47f0632f..3b94919d4e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java @@ -136,7 +136,7 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, final ClientMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { short versionMajor = (short) in.readUnsignedByte(); @@ -145,7 +145,9 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA byte[] mechanisms = EncodingUtils.readBytes(in); byte[] locales = EncodingUtils.readBytes(in); - - dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java index da3d0a2c56..5b6a8e3ef7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java @@ -126,7 +126,7 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl return buf.toString(); } - public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, final ServerMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { @@ -134,7 +134,9 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl AMQShortString mechanism = in.readAMQShortString(); byte[] response = EncodingUtils.readBytes(in); AMQShortString locale = in.readAMQShortString(); - - dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java index 3383fd889a..04def21d44 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java @@ -119,12 +119,15 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ClientMethodProcessor dispatcher) throws IOException { int channelMax = buffer.readUnsignedShort(); long frameMax = EncodingUtils.readUnsignedInteger(buffer); int heartbeat = buffer.readUnsignedShort(); - dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java index f695eda2c4..3141a85766 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java @@ -119,12 +119,15 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ServerMethodProcessor dispatcher) throws IOException { int channelMax = buffer.readUnsignedShort(); long frameMax = EncodingUtils.readUnsignedInteger(buffer); int heartbeat = buffer.readUnsignedShort(); - dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 01beb3af77..4d9826d83c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -21,7 +21,6 @@ package org.apache.qpid.framing; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; @@ -73,33 +72,20 @@ public class ContentBody implements AMQBody session.contentBodyReceived(channelId, this); } - protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException - { - if (size > 0) - { - _payload = new byte[(int)size]; - buffer.read(getPayload()); - } - - } - - public void reduceBufferToFit() - { - } - public byte[] getPayload() { return _payload; } - public static void process(final int channel, - final MarkableDataInput in, - final MethodProcessor methodProcessor, final long bodySize) + public static void process(final MarkableDataInput in, + final ChannelMethodProcessor methodProcessor, final long bodySize) throws IOException { + byte[] payload = new byte[(int)bodySize]; in.readFully(payload); - methodProcessor.receiveMessageContent(channel, payload); + + methodProcessor.receiveMessageContent(payload); } private static class BufferContentBody implements AMQBody diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 0d54e09ae5..0d25e4dfba 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -155,9 +155,8 @@ public class ContentHeaderBody implements AMQBody _bodySize = bodySize; } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor methodProcessor, final long size) + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor methodProcessor, final long size) throws IOException, AMQFrameDecodingException { @@ -168,13 +167,13 @@ public class ContentHeaderBody implements AMQBody BasicContentHeaderProperties properties; - if (classId != BasicConsumeBody.CLASS_ID) + if (classId != CLASS_ID) { throw new AMQFrameDecodingException(null, "Unsupported content header class id: " + classId, null); } - properties = new BasicContentHeaderProperties(); + properties = new BasicContentHeaderProperties(); properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14)); - methodProcessor.receiveMessageHeader(channelId, properties, bodySize); + methodProcessor.receiveMessageHeader(properties, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java index 7548db6e93..e8dc2ae442 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java @@ -122,13 +122,17 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); AMQShortString queue = buffer.readAMQShortString(); - dispatcher.receiveExchangeBound(channelId, exchange, routingKey, queue); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveExchangeBound(exchange, routingKey, queue); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java index 6b02b066ae..ef91c1d635 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java @@ -115,12 +115,16 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); - dispatcher.receiveExchangeBoundOk(channelId, replyCode, replyText); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveExchangeBoundOk(replyCode, replyText); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java index 06e590f8e5..4001ba7aa0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java @@ -204,9 +204,8 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -219,14 +218,16 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA boolean internal = (bitfield & 0x8) == 0x8; boolean nowait = (bitfield & 0x10) == 0x10; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveExchangeDeclare(channelId, - exchange, - type, - passive, - durable, - autoDelete, - internal, - nowait, - arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveExchangeDeclare(exchange, + type, + passive, + durable, + autoDelete, + internal, + nowait, + arguments); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java index 4a30e25502..f4646315cd 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java @@ -138,7 +138,8 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { @@ -147,6 +148,9 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM byte bitfield = buffer.readByte(); boolean ifUnused = (bitfield & 0x01) == 0x01; boolean nowait = (bitfield & 0x02) == 0x02; - dispatcher.receiveExchangeDelete(channelId, exchange, ifUnused, nowait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveExchangeDelete(exchange, ifUnused, nowait); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java index 1ad0f3081b..19b091a359 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java @@ -23,7 +23,9 @@ package org.apache.qpid.framing; import java.util.ArrayList; import java.util.List; -public class FrameCreatingMethodProcessor implements MethodProcessor +public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>, + ClientMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>, + ServerMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor> { private ProtocolVersion _protocolVersion; @@ -61,42 +63,6 @@ public class FrameCreatingMethodProcessor implements MethodProcessor } @Override - public void receiveTxSelect(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE)); - } - - @Override - public void receiveTxSelectOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE)); - } - - @Override - public void receiveTxCommit(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE)); - } - - @Override - public void receiveTxCommitOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE)); - } - - @Override - public void receiveTxRollback(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE)); - } - - @Override - public void receiveTxRollbackOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE)); - } - - @Override public void receiveConnectionSecure(final byte[] challenge) { _processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge))); @@ -163,382 +129,483 @@ public class FrameCreatingMethodProcessor implements MethodProcessor _processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody())); } - @Override - public void receiveChannelOpenOk(final int channelId) + private void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) { - _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) - ? ChannelOpenOkBody.INSTANCE_0_8 - : ChannelOpenOkBody.INSTANCE_0_9)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText))); } @Override - public void receiveChannelFlow(final int channelId, final boolean active) + public void receiveHeartbeat() { - _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active))); + _processedMethods.add(new AMQFrame(0, new HeartbeatBody())); } @Override - public void receiveChannelFlowOk(final int channelId, final boolean active) + public ProtocolVersion getProtocolVersion() { - _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active))); + return _protocolVersion; } @Override - public void receiveChannelAlert(final int channelId, - final int replyCode, - final AMQShortString replyText, - final FieldTable details) + public ClientAndServerChannelMethodProcessor getChannelMethodProcessor(final int channelId) { - _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details))); + return new FrameCreatingChannelMethodProcessor(channelId); } - @Override - public void receiveChannelClose(final int channelId, - final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) + public void setProtocolVersion(final ProtocolVersion protocolVersion) { - _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId))); + _protocolVersion = protocolVersion; } @Override - public void receiveChannelCloseOk(final int channelId) + public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) { - _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE)); + _processedMethods.add(protocolInitiation); } @Override - public void receiveAccessRequest(final int channelId, - final AMQShortString realm, - final boolean exclusive, - final boolean passive, - final boolean active, - final boolean write, - final boolean read) + public void setCurrentMethod(final int classId, final int methodId) { - _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read))); + _classId = classId; + _methodId = methodId; } @Override - public void receiveAccessRequestOk(final int channelId, final int ticket) + public boolean ignoreAllButCloseOk() { - _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket))); + return false; } - @Override - public void receiveExchangeDeclare(final int channelId, - final AMQShortString exchange, - final AMQShortString type, - final boolean passive, - final boolean durable, - final boolean autoDelete, - final boolean internal, - final boolean nowait, final FieldTable arguments) + public int getClassId() { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments))); + return _classId; } - @Override - public void receiveExchangeDeclareOk(final int channelId) + public int getMethodId() { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody())); + return _methodId; } - @Override - public void receiveExchangeDelete(final int channelId, - final AMQShortString exchange, - final boolean ifUnused, - final boolean nowait) + public static interface ClientAndServerChannelMethodProcessor extends ServerChannelMethodProcessor, ClientChannelMethodProcessor { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait))); - } - @Override - public void receiveExchangeDeleteOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody())); } - @Override - public void receiveExchangeBound(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final AMQShortString queue) + private class FrameCreatingChannelMethodProcessor implements ClientAndServerChannelMethodProcessor { - _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue))); - } + private final int _channelId; - @Override - public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) - { - _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText))); - } + private FrameCreatingChannelMethodProcessor(final int channelId) + { + _channelId = channelId; + } - @Override - public void receiveQueueBindOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody())); - } - @Override - public void receiveQueueUnbindOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody())); - } + @Override + public void receiveChannelOpenOk() + { + _processedMethods.add(new AMQFrame(_channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) + ? ChannelOpenOkBody.INSTANCE_0_8 + : ChannelOpenOkBody.INSTANCE_0_9)); + } - @Override - public void receiveQueueDeclare(final int channelId, - final AMQShortString queue, - final boolean passive, - final boolean durable, - final boolean exclusive, - final boolean autoDelete, - final boolean nowait, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments))); - } + @Override + public void receiveChannelAlert(final int replyCode, final AMQShortString replyText, final FieldTable details) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelAlertBody(replyCode, replyText, details))); + } - @Override - public void receiveQueueDeclareOk(final int channelId, - final AMQShortString queue, - final long messageCount, - final long consumerCount) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount))); - } + @Override + public void receiveAccessRequestOk(final int ticket) + { + _processedMethods.add(new AMQFrame(_channelId, new AccessRequestOkBody(ticket))); + } - @Override - public void receiveQueueBind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final boolean nowait, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments))); - } + @Override + public void receiveExchangeDeclareOk() + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareOkBody())); + } - @Override - public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait) - { - _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait))); - } + @Override + public void receiveExchangeDeleteOk() + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteOkBody())); + } - @Override - public void receiveQueuePurgeOk(final int channelId, final long messageCount) - { - _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount))); - } + @Override + public void receiveExchangeBoundOk(final int replyCode, final AMQShortString replyText) + { + FrameCreatingMethodProcessor.this.receiveExchangeBoundOk(_channelId, replyCode, replyText); + } - @Override - public void receiveQueueDelete(final int channelId, - final AMQShortString queue, - final boolean ifUnused, - final boolean ifEmpty, - final boolean nowait) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait))); - } + @Override + public void receiveQueueBindOk() + { + _processedMethods.add(new AMQFrame(_channelId, new QueueBindOkBody())); + } - @Override - public void receiveQueueDeleteOk(final int channelId, final long messageCount) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount))); - } + @Override + public void receiveQueueUnbindOk() + { + _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindOkBody())); + } - @Override - public void receiveQueueUnbind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments))); - } + @Override + public void receiveQueueDeclareOk(final AMQShortString queue, final long messageCount, final long consumerCount) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount))); + } - @Override - public void receiveBasicRecoverSyncOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()))); - } + @Override + public void receiveQueuePurgeOk(final long messageCount) + { + _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeOkBody(messageCount))); + } - @Override - public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync) - { - if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) + @Override + public void receiveQueueDeleteOk(final long messageCount) { - _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue))); + _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteOkBody(messageCount))); } - else + + @Override + public void receiveBasicRecoverSyncOk() { - _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue))); + _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncOkBody(getProtocolVersion()))); } - } - @Override - public void receiveBasicQos(final int channelId, - final long prefetchSize, - final int prefetchCount, - final boolean global) - { - _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global))); - } + @Override + public void receiveBasicQosOk() + { + _processedMethods.add(new AMQFrame(_channelId, new BasicQosOkBody())); + } - @Override - public void receiveBasicQosOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody())); - } + @Override + public void receiveBasicConsumeOk(final AMQShortString consumerTag) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeOkBody(consumerTag))); + } - @Override - public void receiveBasicConsume(final int channelId, - final AMQShortString queue, - final AMQShortString consumerTag, - final boolean noLocal, - final boolean noAck, - final boolean exclusive, - final boolean nowait, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments))); - } + @Override + public void receiveBasicCancelOk(final AMQShortString consumerTag) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicCancelOkBody(consumerTag))); + } - @Override - public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag) - { - _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag))); - } + @Override + public void receiveBasicReturn(final int replyCode, + final AMQShortString replyText, + final AMQShortString exchange, + final AMQShortString routingKey) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicReturnBody(replyCode, + replyText, + exchange, + routingKey))); + } - @Override - public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) - { - _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait))); - } + @Override + public void receiveBasicDeliver(final AMQShortString consumerTag, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicDeliverBody(consumerTag, + deliveryTag, + redelivered, + exchange, + routingKey))); + } - @Override - public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag) - { - _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag))); - } + @Override + public void receiveBasicGetOk(final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey, + final long messageCount) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicGetOkBody(deliveryTag, + redelivered, + exchange, + routingKey, + messageCount))); + } - @Override - public void receiveBasicPublish(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final boolean mandatory, - final boolean immediate) - { - _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate))); - } + @Override + public void receiveBasicGetEmpty() + { + _processedMethods.add(new AMQFrame(_channelId, new BasicGetEmptyBody((AMQShortString)null))); + } - @Override - public void receiveBasicReturn(final int channelId, final int replyCode, - final AMQShortString replyText, - final AMQShortString exchange, - final AMQShortString routingKey) - { - _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey))); - } + @Override + public void receiveTxSelectOk() + { + _processedMethods.add(new AMQFrame(_channelId, TxSelectOkBody.INSTANCE)); + } - @Override - public void receiveBasicDeliver(final int channelId, - final AMQShortString consumerTag, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey) - { - _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); - } + @Override + public void receiveTxCommitOk() + { + _processedMethods.add(new AMQFrame(_channelId, TxCommitOkBody.INSTANCE)); + } - @Override - public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck) - { - _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck))); - } + @Override + public void receiveTxRollbackOk() + { + _processedMethods.add(new AMQFrame(_channelId, TxRollbackOkBody.INSTANCE)); + } - @Override - public void receiveBasicGetOk(final int channelId, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey, - final long messageCount) - { - _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); - } + @Override + public void receiveAccessRequest(final AMQShortString realm, + final boolean exclusive, + final boolean passive, + final boolean active, + final boolean write, + final boolean read) + { + _processedMethods.add(new AMQFrame(_channelId, new AccessRequestBody(realm, + exclusive, + passive, + active, + write, + read))); + } - @Override - public void receiveBasicGetEmpty(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null))); - } + @Override + public void receiveExchangeDeclare(final AMQShortString exchange, + final AMQShortString type, + final boolean passive, + final boolean durable, + final boolean autoDelete, + final boolean internal, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareBody(0, + exchange, + type, + passive, + durable, + autoDelete, + internal, + nowait, + arguments))); + } - @Override - public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple) - { - _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple))); - } + @Override + public void receiveExchangeDelete(final AMQShortString exchange, final boolean ifUnused, final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait))); + } - @Override - public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue) - { - _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue))); - } + @Override + public void receiveExchangeBound(final AMQShortString exchange, + final AMQShortString routingKey, + final AMQShortString queue) + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeBoundBody(exchange, routingKey, queue))); + } - @Override - public void receiveHeartbeat() - { - _processedMethods.add(new AMQFrame(0, new HeartbeatBody())); - } + @Override + public void receiveQueueDeclare(final AMQShortString queue, + final boolean passive, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareBody(0, + queue, + passive, + durable, + exclusive, + autoDelete, + nowait, + arguments))); + } - @Override - public ProtocolVersion getProtocolVersion() - { - return _protocolVersion; - } + @Override + public void receiveQueueBind(final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueBindBody(0, + queue, + exchange, + bindingKey, + nowait, + arguments))); + } - public void setProtocolVersion(final ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - } + @Override + public void receiveQueuePurge(final AMQShortString queue, final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeBody(0, queue, nowait))); + } - @Override - public void receiveMessageContent(final int channelId, final byte[] data) - { - _processedMethods.add(new AMQFrame(channelId, new ContentBody(data))); - } + @Override + public void receiveQueueDelete(final AMQShortString queue, + final boolean ifUnused, + final boolean ifEmpty, + final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait))); + } - @Override - public void receiveMessageHeader(final int channelId, - final BasicContentHeaderProperties properties, - final long bodySize) - { - _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize))); - } + @Override + public void receiveQueueUnbind(final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindBody(0, + queue, + exchange, + bindingKey, + arguments))); + } - @Override - public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) - { - _processedMethods.add(protocolInitiation); - } + @Override + public void receiveBasicRecover(final boolean requeue, final boolean sync) + { + if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverBody(requeue))); + } + else + { + _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue))); + } + } - @Override - public void setCurrentMethod(final int classId, final int methodId) - { - _classId = classId; - _methodId = methodId; - } + @Override + public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicQosBody(prefetchSize, prefetchCount, global))); + } - public int getClassId() - { - return _classId; - } + @Override + public void receiveBasicConsume(final AMQShortString queue, + final AMQShortString consumerTag, + final boolean noLocal, + final boolean noAck, + final boolean exclusive, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeBody(0, + queue, + consumerTag, + noLocal, + noAck, + exclusive, + nowait, + arguments))); + } - public int getMethodId() - { - return _methodId; + @Override + public void receiveBasicCancel(final AMQShortString consumerTag, final boolean noWait) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicCancelBody(consumerTag, noWait))); + } + + @Override + public void receiveBasicPublish(final AMQShortString exchange, + final AMQShortString routingKey, + final boolean mandatory, + final boolean immediate) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicPublishBody(0, + exchange, + routingKey, + mandatory, + immediate))); + } + + @Override + public void receiveBasicGet(final AMQShortString queue, final boolean noAck) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicGetBody(0, queue, noAck))); + } + + @Override + public void receiveBasicAck(final long deliveryTag, final boolean multiple) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicAckBody(deliveryTag, multiple))); + } + + @Override + public void receiveBasicReject(final long deliveryTag, final boolean requeue) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicRejectBody(deliveryTag, requeue))); + } + + @Override + public void receiveTxSelect() + { + _processedMethods.add(new AMQFrame(_channelId, TxSelectBody.INSTANCE)); + } + + @Override + public void receiveTxCommit() + { + _processedMethods.add(new AMQFrame(_channelId, TxCommitBody.INSTANCE)); + } + + @Override + public void receiveTxRollback() + { + _processedMethods.add(new AMQFrame(_channelId, TxRollbackBody.INSTANCE)); + } + + @Override + public void receiveChannelFlow(final boolean active) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowBody(active))); + } + + @Override + public void receiveChannelFlowOk(final boolean active) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowOkBody(active))); + } + + @Override + public void receiveChannelClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId))); + } + + @Override + public void receiveChannelCloseOk() + { + _processedMethods.add(new AMQFrame(_channelId, ChannelCloseOkBody.INSTANCE)); + } + + @Override + public void receiveMessageContent(final byte[] data) + { + _processedMethods.add(new AMQFrame(_channelId, new ContentBody(data))); + } + + @Override + public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize) + { + _processedMethods.add(new AMQFrame(_channelId, new ContentHeaderBody(properties, bodySize))); + } + + @Override + public boolean ignoreAllButCloseOk() + { + return false; + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java index 0b08059631..62c0cd3c6d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java @@ -20,184 +20,21 @@ */ package org.apache.qpid.framing; -public interface MethodProcessor +public interface MethodProcessor<T extends ChannelMethodProcessor> { ProtocolVersion getProtocolVersion(); - void receiveConnectionStart(short versionMajor, - short versionMinor, - FieldTable serverProperties, - byte[] mechanisms, - byte[] locales); - - void receiveConnectionStartOk(FieldTable clientProperties, - AMQShortString mechanism, - byte[] response, - AMQShortString locale); - - void receiveTxSelect(int channelId); - - void receiveTxSelectOk(int channelId); - - void receiveTxCommit(int channelId); - - void receiveTxCommitOk(int channelId); - - void receiveTxRollback(int channelId); - - void receiveTxRollbackOk(int channelId); - - void receiveConnectionSecure(byte[] challenge); - - void receiveConnectionSecureOk(byte[] response); - - void receiveConnectionTune(int channelMax, long frameMax, int heartbeat); - - void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); - - void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); - - void receiveConnectionOpenOk(AMQShortString knownHosts); - - void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts); + T getChannelMethodProcessor(int channelId); void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); void receiveConnectionCloseOk(); - void receiveChannelOpen(int channelId); - - void receiveChannelOpenOk(int channelId); - - void receiveChannelFlow(int channelId, boolean active); - - void receiveChannelFlowOk(int channelId, boolean active); - - void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details); - - void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId); - - void receiveChannelCloseOk(int channelId); - - void receiveAccessRequest(int channelId, - AMQShortString realm, - boolean exclusive, - boolean passive, - boolean active, - boolean write, boolean read); - - void receiveAccessRequestOk(int channelId, int ticket); - - void receiveExchangeDeclare(int channelId, - AMQShortString exchange, - AMQShortString type, - boolean passive, - boolean durable, - boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); - - void receiveExchangeDeclareOk(int channelId); - - void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait); - - void receiveExchangeDeleteOk(int channelId); - - void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); - - void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText); - - void receiveQueueBindOk(int channelId); - - void receiveQueueUnbindOk(final int channelId); - - void receiveQueueDeclare(int channelId, - AMQShortString queue, - boolean passive, - boolean durable, - boolean exclusive, - boolean autoDelete, boolean nowait, FieldTable arguments); - - void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount); - - void receiveQueueBind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - boolean nowait, FieldTable arguments); - - void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait); - - void receiveQueuePurgeOk(int channelId, long messageCount); - - void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); - - void receiveQueueDeleteOk(int channelId, long messageCount); - - void receiveQueueUnbind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - FieldTable arguments); - - void receiveBasicRecoverSyncOk(int channelId); - - void receiveBasicRecover(int channelId, final boolean requeue, boolean sync); - - void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global); - - void receiveBasicQosOk(int channelId); - - void receiveBasicConsume(int channelId, - AMQShortString queue, - AMQShortString consumerTag, - boolean noLocal, - boolean noAck, - boolean exclusive, boolean nowait, FieldTable arguments); - - void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag); - - void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait); - - void receiveBasicCancelOk(int channelId, AMQShortString consumerTag); - - void receiveBasicPublish(int channelId, - AMQShortString exchange, - AMQShortString routingKey, - boolean mandatory, - boolean immediate); - - void receiveBasicReturn(final int channelId, - int replyCode, - AMQShortString replyText, - AMQShortString exchange, - AMQShortString routingKey); - - void receiveBasicDeliver(int channelId, - AMQShortString consumerTag, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, AMQShortString routingKey); - - void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck); - - void receiveBasicGetOk(int channelId, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, - AMQShortString routingKey, long messageCount); - - void receiveBasicGetEmpty(int channelId); - - void receiveBasicAck(int channelId, long deliveryTag, boolean multiple); - - void receiveBasicReject(int channelId, long deliveryTag, boolean requeue); - void receiveHeartbeat(); - void receiveMessageContent(int channelId, byte[] data); - - void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); - void receiveProtocolHeader(ProtocolInitiation protocolInitiation); void setCurrentMethod(int classId, int methodId); + + boolean ignoreAllButCloseOk(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java index e4419f77e3..2b7e26a7f0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java @@ -165,9 +165,8 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -176,6 +175,9 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData AMQShortString bindingKey = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueBind(queue, exchange, bindingKey, nowait, arguments); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java index 1f9888c76a..5a359dc8df 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java @@ -191,9 +191,8 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -206,6 +205,9 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD boolean autoDelete = (bitfield & 0x08 ) == 0x08; boolean nowait = (bitfield & 0x010 ) == 0x010; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java index 9857bb3a39..cf6fc656b3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java @@ -120,13 +120,15 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString queue = buffer.readAMQShortString(); long messageCount = EncodingUtils.readUnsignedInteger(buffer); long consumerCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDeclareOk(queue, messageCount, consumerCount); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java index 408f9f9667..ea933dc644 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java @@ -151,9 +151,8 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); @@ -163,6 +162,9 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa boolean ifUnused = (bitfield & 0x01) == 0x01; boolean ifEmpty = (bitfield & 0x02) == 0x02; boolean nowait = (bitfield & 0x04) == 0x04; - dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDelete(queue, ifUnused, ifEmpty, nowait); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java index b43369b68a..6d50153c15 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java @@ -95,11 +95,13 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveQueueDeleteOk(channelId, messageCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDeleteOk(messageCount); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java index 5a04e21355..58a424387c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java @@ -125,14 +125,16 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); AMQShortString queue = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveQueuePurge(channelId, queue, nowait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueuePurge(queue, nowait); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java index 40cac8b390..acab2bc052 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java @@ -95,11 +95,13 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveQueuePurgeOk(channelId, messageCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueuePurgeOk(messageCount); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java index a6f3e5b4c5..30c5d19d27 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java @@ -147,9 +147,8 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -157,6 +156,9 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueUnbind(queue, exchange, routingKey, arguments); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java new file mode 100644 index 0000000000..89b75c2d2f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ServerChannelMethodProcessor extends ChannelMethodProcessor +{ + void receiveAccessRequest(AMQShortString realm, + boolean exclusive, + boolean passive, + boolean active, + boolean write, boolean read); + + void receiveExchangeDeclare(AMQShortString exchange, + AMQShortString type, + boolean passive, + boolean durable, + boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); + + void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait); + + void receiveExchangeBound(AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); + + void receiveQueueDeclare(AMQShortString queue, + boolean passive, + boolean durable, + boolean exclusive, + boolean autoDelete, boolean nowait, FieldTable arguments); + + void receiveQueueBind(AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + boolean nowait, FieldTable arguments); + + void receiveQueuePurge(AMQShortString queue, boolean nowait); + + void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); + + void receiveQueueUnbind(AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + FieldTable arguments); + + void receiveBasicRecover(final boolean requeue, boolean sync); + + void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global); + + void receiveBasicConsume(AMQShortString queue, + AMQShortString consumerTag, + boolean noLocal, + boolean noAck, + boolean exclusive, boolean nowait, FieldTable arguments); + + void receiveBasicCancel(AMQShortString consumerTag, boolean noWait); + + void receiveBasicPublish(AMQShortString exchange, + AMQShortString routingKey, + boolean mandatory, + boolean immediate); + + void receiveBasicGet(AMQShortString queue, boolean noAck); + + void receiveBasicAck(long deliveryTag, boolean multiple); + + void receiveBasicReject(long deliveryTag, boolean requeue); + + + + void receiveTxSelect(); + + void receiveTxCommit(); + + void receiveTxRollback(); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java new file mode 100644 index 0000000000..77b4a1fc6b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ServerMethodProcessor<T extends ServerChannelMethodProcessor> extends MethodProcessor<T> +{ + void receiveConnectionStartOk(FieldTable clientProperties, + AMQShortString mechanism, + byte[] response, + AMQShortString locale); + + void receiveConnectionSecureOk(byte[] response); + + void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); + + void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); + + void receiveChannelOpen(int channelId); + + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java index bd3e9bbcbc..61d5f0629c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.transport.util; -import java.nio.ByteBuffer; - import static java.lang.Math.min; +import java.nio.ByteBuffer; + /** * Functions @@ -33,6 +33,9 @@ import static java.lang.Math.min; public final class Functions { + private static final char[] HEX_CHARACTERS = + {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + private Functions() { } @@ -102,4 +105,21 @@ public final class Functions return str(ByteBuffer.wrap(bytes), limit); } + public static String hex(byte[] bytes, int limit) + { + limit = Math.min(limit, bytes == null ? 0 : bytes.length); + StringBuilder sb = new StringBuilder(3 + limit*2); + for(int i = 0; i < limit; i++) + { + sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0xf0)>>4]); + sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0x0f)]); + + } + if(bytes != null && bytes.length>limit) + { + sb.append("..."); + } + return sb.toString(); + } + } |