From a6dadb00efda0b8a33e4cec9b51be475b9e1e078 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 11 Oct 2014 23:46:39 +0000 Subject: Move channel methods git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1631137 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/AMQChannelException.java | 17 +- .../apache/qpid/framing/AMQDataBlockDecoder.java | 494 +++++++++++---------- .../apache/qpid/framing/ExchangeBoundOkBody.java | 7 + .../qpid/framing/FrameCreatingMethodProcessor.java | 19 + .../org/apache/qpid/framing/MethodProcessor.java | 2 + 5 files changed, 292 insertions(+), 247 deletions(-) (limited to 'java/common') diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 2264dee682..7ab422eb4f 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -20,8 +20,6 @@ */ package org.apache.qpid; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; @@ -49,10 +47,19 @@ public class AMQChannelException extends AMQException } - public AMQFrame getCloseFrame(int channel) + public int getClassId() { - return new AMQFrame(channel, _methodRegistry.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), - AMQShortString.validValueOf(getMessage()),_classId,_methodId)); + return _classId; + } + + public int getMethodId() + { + return _methodId; + } + + public MethodRegistry getMethodRegistry() + { + return _methodRegistry; } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index a05e3db139..49a88b6bc1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -123,254 +123,264 @@ public class AMQDataBlockDecoder throws AMQFrameDecodingException, IOException { final int classAndMethod = in.readInt(); - switch (classAndMethod) + int classId = classAndMethod >> 16; + int methodId = classAndMethod & 0xFFFF; + dispatcher.setCurrentMethod(classId, methodId); + try { - //CONNECTION_CLASS: - case 0x000a000a: - ConnectionStartBody.process(in, dispatcher); - break; - case 0x000a000b: - ConnectionStartOkBody.process(in, dispatcher); - break; - case 0x000a0014: - ConnectionSecureBody.process(in, dispatcher); - break; - case 0x000a0015: - ConnectionSecureOkBody.process(in, dispatcher); - break; - case 0x000a001e: - ConnectionTuneBody.process(in, dispatcher); - break; - case 0x000a001f: - ConnectionTuneOkBody.process(in, dispatcher); - break; - case 0x000a0028: - ConnectionOpenBody.process(in, dispatcher); - break; - case 0x000a0029: - ConnectionOpenOkBody.process(in, dispatcher); - break; - case 0x000a002a: - ConnectionRedirectBody.process(in, dispatcher); - break; - case 0x000a0032: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { + switch (classAndMethod) + { + //CONNECTION_CLASS: + case 0x000a000a: + ConnectionStartBody.process(in, dispatcher); + break; + case 0x000a000b: + ConnectionStartOkBody.process(in, dispatcher); + break; + case 0x000a0014: + ConnectionSecureBody.process(in, dispatcher); + break; + case 0x000a0015: + ConnectionSecureOkBody.process(in, dispatcher); + break; + case 0x000a001e: + ConnectionTuneBody.process(in, dispatcher); + break; + case 0x000a001f: + ConnectionTuneOkBody.process(in, dispatcher); + break; + case 0x000a0028: + ConnectionOpenBody.process(in, dispatcher); + break; + case 0x000a0029: + ConnectionOpenOkBody.process(in, dispatcher); + break; + case 0x000a002a: ConnectionRedirectBody.process(in, dispatcher); - } - else - { - ConnectionCloseBody.process(in, dispatcher); - } - break; - case 0x000a0033: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), - dispatcher.getProtocolVersion()); - } - else - { - dispatcher.receiveConnectionCloseOk(); - } - break; - case 0x000a003c: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - ConnectionCloseBody.process(in, dispatcher); - } - else - { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), - dispatcher.getProtocolVersion()); - } - break; - case 0x000a003d: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - dispatcher.receiveConnectionCloseOk(); - } - else - { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), - dispatcher.getProtocolVersion()); - } - break; + break; + case 0x000a0032: + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionRedirectBody.process(in, dispatcher); + } + else + { + ConnectionCloseBody.process(in, dispatcher); + } + break; + case 0x000a0033: + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + throw newUnknownMethodException(classId, methodId, + dispatcher.getProtocolVersion()); + } + else + { + dispatcher.receiveConnectionCloseOk(); + } + break; + case 0x000a003c: + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionCloseBody.process(in, dispatcher); + } + else + { + throw newUnknownMethodException(classId, methodId, + dispatcher.getProtocolVersion()); + } + break; + case 0x000a003d: + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + dispatcher.receiveConnectionCloseOk(); + } + else + { + throw newUnknownMethodException(classId, methodId, + dispatcher.getProtocolVersion()); + } + break; // CHANNEL_CLASS: - case 0x0014000a: - ChannelOpenBody.process(channelId, in, dispatcher); - break; - case 0x0014000b: - ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); - break; - case 0x00140014: - ChannelFlowBody.process(channelId, in, dispatcher); - break; - case 0x00140015: - ChannelFlowOkBody.process(channelId, in, dispatcher); - break; - case 0x0014001e: - ChannelAlertBody.process(channelId, in, dispatcher); - break; - case 0x00140028: - ChannelCloseBody.process(channelId, in, dispatcher); - break; - case 0x00140029: - dispatcher.receiveChannelCloseOk(channelId); - break; - - // ACCESS_CLASS: - - case 0x001e000a: - AccessRequestBody.process(channelId, in, dispatcher); - break; - case 0x001e000b: - AccessRequestOkBody.process(channelId, in, dispatcher); - break; - - // EXCHANGE_CLASS: - - case 0x0028000a: - ExchangeDeclareBody.process(channelId, in, dispatcher); - break; - case 0x0028000b: - dispatcher.receiveExchangeDeclareOk(channelId); - break; - case 0x00280014: - ExchangeDeleteBody.process(channelId, in, dispatcher); - break; - case 0x00280015: - dispatcher.receiveExchangeDeleteOk(channelId); - break; - case 0x00280016: - ExchangeBoundBody.process(channelId, in, dispatcher); - break; - case 0x00280017: - ExchangeBoundOkBody.process(channelId, in, dispatcher); - break; - - - // QUEUE_CLASS: - - case 0x0032000a: - QueueDeclareBody.process(channelId, in, dispatcher); - break; - case 0x0032000b: - QueueDeclareOkBody.process(channelId, in, dispatcher); - break; - case 0x00320014: - QueueBindBody.process(channelId, in, dispatcher); - break; - case 0x00320015: - dispatcher.receiveQueueBindOk(channelId); - break; - case 0x0032001e: - QueuePurgeBody.process(channelId, in, dispatcher); - break; - case 0x0032001f: - QueuePurgeOkBody.process(channelId, in, dispatcher); - break; - case 0x00320028: - QueueDeleteBody.process(channelId, in, dispatcher); - break; - case 0x00320029: - QueueDeleteOkBody.process(channelId, in, dispatcher); - break; - case 0x00320032: - QueueUnbindBody.process(channelId, in, dispatcher); - break; - case 0x00320033: - dispatcher.receiveQueueUnbindOk(channelId); - break; - - - // BASIC_CLASS: - - case 0x003c000a: - BasicQosBody.process(channelId, in, dispatcher); - break; - case 0x003c000b: - dispatcher.receiveBasicQosOk(channelId); - break; - case 0x003c0014: - BasicConsumeBody.process(channelId, in, dispatcher); - break; - case 0x003c0015: - BasicConsumeOkBody.process(channelId, in, dispatcher); - break; - case 0x003c001e: - BasicCancelBody.process(channelId, in, dispatcher); - break; - case 0x003c001f: - BasicCancelOkBody.process(channelId, in, dispatcher); - break; - case 0x003c0028: - BasicPublishBody.process(channelId, in, dispatcher); - break; - case 0x003c0032: - BasicReturnBody.process(channelId, in, dispatcher); - break; - case 0x003c003c: - BasicDeliverBody.process(channelId, in, dispatcher); - break; - case 0x003c0046: - BasicGetBody.process(channelId, in, dispatcher); - break; - case 0x003c0047: - BasicGetOkBody.process(channelId, in, dispatcher); - break; - case 0x003c0048: - BasicGetEmptyBody.process(channelId, in, dispatcher); - break; - case 0x003c0050: - BasicAckBody.process(channelId, in, dispatcher); - break; - case 0x003c005a: - BasicRejectBody.process(channelId, in, dispatcher); - break; - case 0x003c0064: - BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); - break; - case 0x003c0065: - dispatcher.receiveBasicRecoverSyncOk(channelId); - break; - case 0x003c0066: - BasicRecoverSyncBody.process(channelId, in, dispatcher); - break; - case 0x003c006e: - BasicRecoverSyncBody.process(channelId, in, dispatcher); - break; - case 0x003c006f: - dispatcher.receiveBasicRecoverSyncOk(channelId); - break; - - // TX_CLASS: - - case 0x005a000a: - dispatcher.receiveTxSelect(channelId); - break; - case 0x005a000b: - dispatcher.receiveTxSelectOk(channelId); - break; - case 0x005a0014: - dispatcher.receiveTxCommit(channelId); - break; - case 0x005a0015: - dispatcher.receiveTxCommitOk(channelId); - break; - case 0x005a001e: - dispatcher.receiveTxRollback(channelId); - break; - case 0x005a001f: - dispatcher.receiveTxRollbackOk(channelId); - break; - - default: - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), - dispatcher.getProtocolVersion()); + case 0x0014000a: + ChannelOpenBody.process(channelId, in, dispatcher); + break; + case 0x0014000b: + ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); + break; + case 0x00140014: + ChannelFlowBody.process(channelId, in, dispatcher); + break; + case 0x00140015: + ChannelFlowOkBody.process(channelId, in, dispatcher); + break; + case 0x0014001e: + ChannelAlertBody.process(channelId, in, dispatcher); + break; + case 0x00140028: + ChannelCloseBody.process(channelId, in, dispatcher); + break; + case 0x00140029: + dispatcher.receiveChannelCloseOk(channelId); + break; + + // ACCESS_CLASS: + + case 0x001e000a: + AccessRequestBody.process(channelId, in, dispatcher); + break; + case 0x001e000b: + AccessRequestOkBody.process(channelId, in, dispatcher); + break; + + // EXCHANGE_CLASS: + + case 0x0028000a: + ExchangeDeclareBody.process(channelId, in, dispatcher); + break; + case 0x0028000b: + dispatcher.receiveExchangeDeclareOk(channelId); + break; + case 0x00280014: + ExchangeDeleteBody.process(channelId, in, dispatcher); + break; + case 0x00280015: + dispatcher.receiveExchangeDeleteOk(channelId); + break; + case 0x00280016: + ExchangeBoundBody.process(channelId, in, dispatcher); + break; + case 0x00280017: + ExchangeBoundOkBody.process(channelId, in, dispatcher); + break; + + + // QUEUE_CLASS: + + case 0x0032000a: + QueueDeclareBody.process(channelId, in, dispatcher); + break; + case 0x0032000b: + QueueDeclareOkBody.process(channelId, in, dispatcher); + break; + case 0x00320014: + QueueBindBody.process(channelId, in, dispatcher); + break; + case 0x00320015: + dispatcher.receiveQueueBindOk(channelId); + break; + case 0x0032001e: + QueuePurgeBody.process(channelId, in, dispatcher); + break; + case 0x0032001f: + QueuePurgeOkBody.process(channelId, in, dispatcher); + break; + case 0x00320028: + QueueDeleteBody.process(channelId, in, dispatcher); + break; + case 0x00320029: + QueueDeleteOkBody.process(channelId, in, dispatcher); + break; + case 0x00320032: + QueueUnbindBody.process(channelId, in, dispatcher); + break; + case 0x00320033: + dispatcher.receiveQueueUnbindOk(channelId); + break; + + + // BASIC_CLASS: + + case 0x003c000a: + BasicQosBody.process(channelId, in, dispatcher); + break; + case 0x003c000b: + dispatcher.receiveBasicQosOk(channelId); + break; + case 0x003c0014: + BasicConsumeBody.process(channelId, in, dispatcher); + break; + case 0x003c0015: + BasicConsumeOkBody.process(channelId, in, dispatcher); + break; + case 0x003c001e: + BasicCancelBody.process(channelId, in, dispatcher); + break; + case 0x003c001f: + BasicCancelOkBody.process(channelId, in, dispatcher); + break; + case 0x003c0028: + BasicPublishBody.process(channelId, in, dispatcher); + break; + case 0x003c0032: + BasicReturnBody.process(channelId, in, dispatcher); + break; + case 0x003c003c: + BasicDeliverBody.process(channelId, in, dispatcher); + break; + case 0x003c0046: + BasicGetBody.process(channelId, in, dispatcher); + break; + case 0x003c0047: + BasicGetOkBody.process(channelId, in, dispatcher); + break; + case 0x003c0048: + BasicGetEmptyBody.process(channelId, in, dispatcher); + break; + case 0x003c0050: + BasicAckBody.process(channelId, in, dispatcher); + break; + case 0x003c005a: + BasicRejectBody.process(channelId, in, dispatcher); + break; + case 0x003c0064: + BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); + break; + case 0x003c0065: + dispatcher.receiveBasicRecoverSyncOk(channelId); + break; + case 0x003c0066: + BasicRecoverSyncBody.process(channelId, in, dispatcher); + break; + case 0x003c006e: + BasicRecoverSyncBody.process(channelId, in, dispatcher); + break; + case 0x003c006f: + dispatcher.receiveBasicRecoverSyncOk(channelId); + break; + + // TX_CLASS: + + case 0x005a000a: + dispatcher.receiveTxSelect(channelId); + break; + case 0x005a000b: + dispatcher.receiveTxSelectOk(channelId); + break; + case 0x005a0014: + dispatcher.receiveTxCommit(channelId); + break; + case 0x005a0015: + dispatcher.receiveTxCommitOk(channelId); + break; + case 0x005a001e: + dispatcher.receiveTxRollback(channelId); + break; + case 0x005a001f: + dispatcher.receiveTxRollbackOk(channelId); + break; + + default: + throw newUnknownMethodException(classId, methodId, + dispatcher.getProtocolVersion()); + } + } + finally + { + dispatcher.setCurrentMethod(0,0); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java index 869994561f..6b02b066ae 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java @@ -38,6 +38,13 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA public static final int CLASS_ID = 40; public static final int METHOD_ID = 23; + public static final int OK = 0; + public static final int EXCHANGE_NOT_FOUND = 1; + public static final int QUEUE_NOT_FOUND = 2; + public static final int NO_BINDINGS = 3; + public static final int QUEUE_NOT_BOUND = 4; + public static final int NO_QUEUE_BOUND_WITH_RK = 5; + public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; // Fields declared in specification private final int _replyCode; // [replyCode] diff --git a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java index 348df8b24d..1ad0f3081b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java @@ -28,6 +28,8 @@ public class FrameCreatingMethodProcessor implements MethodProcessor private ProtocolVersion _protocolVersion; private final List _processedMethods = new ArrayList<>(); + private int _classId; + private int _methodId; public FrameCreatingMethodProcessor(final ProtocolVersion protocolVersion) { @@ -522,4 +524,21 @@ public class FrameCreatingMethodProcessor implements MethodProcessor { _processedMethods.add(protocolInitiation); } + + @Override + public void setCurrentMethod(final int classId, final int methodId) + { + _classId = classId; + _methodId = methodId; + } + + public int getClassId() + { + return _classId; + } + + public int getMethodId() + { + return _methodId; + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java index e995cbf181..0b08059631 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java @@ -198,4 +198,6 @@ public interface MethodProcessor void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); void receiveProtocolHeader(ProtocolInitiation protocolInitiation); + + void setCurrentMethod(int classId, int methodId); } -- cgit v1.2.1