summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-11 23:46:39 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-11 23:46:39 +0000
commita6dadb00efda0b8a33e4cec9b51be475b9e1e078 (patch)
treec13e76659c47e3f41d3128d9b0390afc990c9f3d /java/common
parent50512c42bacc8f76d297ab407b6b1b74d92bae91 (diff)
downloadqpid-python-a6dadb00efda0b8a33e4cec9b51be475b9e1e078.tar.gz
Move channel methods
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1631137 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelException.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java494
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java19
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java2
5 files changed, 292 insertions, 247 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
index 2264dee682..7ab422eb4f 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
@@ -49,10 +47,19 @@ public class AMQChannelException extends AMQException
}
- public AMQFrame getCloseFrame(int channel)
+ public int getClassId()
{
- return new AMQFrame(channel, _methodRegistry.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(),
- AMQShortString.validValueOf(getMessage()),_classId,_methodId));
+ return _classId;
+ }
+
+ public int getMethodId()
+ {
+ return _methodId;
+ }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ return _methodRegistry;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index a05e3db139..49a88b6bc1 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -123,254 +123,264 @@ public class AMQDataBlockDecoder
throws AMQFrameDecodingException, IOException
{
final int classAndMethod = in.readInt();
- switch (classAndMethod)
+ int classId = classAndMethod >> 16;
+ int methodId = classAndMethod & 0xFFFF;
+ dispatcher.setCurrentMethod(classId, methodId);
+ try
{
- //CONNECTION_CLASS:
- case 0x000a000a:
- ConnectionStartBody.process(in, dispatcher);
- break;
- case 0x000a000b:
- ConnectionStartOkBody.process(in, dispatcher);
- break;
- case 0x000a0014:
- ConnectionSecureBody.process(in, dispatcher);
- break;
- case 0x000a0015:
- ConnectionSecureOkBody.process(in, dispatcher);
- break;
- case 0x000a001e:
- ConnectionTuneBody.process(in, dispatcher);
- break;
- case 0x000a001f:
- ConnectionTuneOkBody.process(in, dispatcher);
- break;
- case 0x000a0028:
- ConnectionOpenBody.process(in, dispatcher);
- break;
- case 0x000a0029:
- ConnectionOpenOkBody.process(in, dispatcher);
- break;
- case 0x000a002a:
- ConnectionRedirectBody.process(in, dispatcher);
- break;
- case 0x000a0032:
- if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
+ switch (classAndMethod)
+ {
+ //CONNECTION_CLASS:
+ case 0x000a000a:
+ ConnectionStartBody.process(in, dispatcher);
+ break;
+ case 0x000a000b:
+ ConnectionStartOkBody.process(in, dispatcher);
+ break;
+ case 0x000a0014:
+ ConnectionSecureBody.process(in, dispatcher);
+ break;
+ case 0x000a0015:
+ ConnectionSecureOkBody.process(in, dispatcher);
+ break;
+ case 0x000a001e:
+ ConnectionTuneBody.process(in, dispatcher);
+ break;
+ case 0x000a001f:
+ ConnectionTuneOkBody.process(in, dispatcher);
+ break;
+ case 0x000a0028:
+ ConnectionOpenBody.process(in, dispatcher);
+ break;
+ case 0x000a0029:
+ ConnectionOpenOkBody.process(in, dispatcher);
+ break;
+ case 0x000a002a:
ConnectionRedirectBody.process(in, dispatcher);
- }
- else
- {
- ConnectionCloseBody.process(in, dispatcher);
- }
- break;
- case 0x000a0033:
- if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
- dispatcher.getProtocolVersion());
- }
- else
- {
- dispatcher.receiveConnectionCloseOk();
- }
- break;
- case 0x000a003c:
- if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
- ConnectionCloseBody.process(in, dispatcher);
- }
- else
- {
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
- dispatcher.getProtocolVersion());
- }
- break;
- case 0x000a003d:
- if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
- dispatcher.receiveConnectionCloseOk();
- }
- else
- {
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
- dispatcher.getProtocolVersion());
- }
- break;
+ break;
+ case 0x000a0032:
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ ConnectionRedirectBody.process(in, dispatcher);
+ }
+ else
+ {
+ ConnectionCloseBody.process(in, dispatcher);
+ }
+ break;
+ case 0x000a0033:
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ throw newUnknownMethodException(classId, methodId,
+ dispatcher.getProtocolVersion());
+ }
+ else
+ {
+ dispatcher.receiveConnectionCloseOk();
+ }
+ break;
+ case 0x000a003c:
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ ConnectionCloseBody.process(in, dispatcher);
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+ dispatcher.getProtocolVersion());
+ }
+ break;
+ case 0x000a003d:
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ dispatcher.receiveConnectionCloseOk();
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+ dispatcher.getProtocolVersion());
+ }
+ break;
// CHANNEL_CLASS:
- case 0x0014000a:
- ChannelOpenBody.process(channelId, in, dispatcher);
- break;
- case 0x0014000b:
- ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
- break;
- case 0x00140014:
- ChannelFlowBody.process(channelId, in, dispatcher);
- break;
- case 0x00140015:
- ChannelFlowOkBody.process(channelId, in, dispatcher);
- break;
- case 0x0014001e:
- ChannelAlertBody.process(channelId, in, dispatcher);
- break;
- case 0x00140028:
- ChannelCloseBody.process(channelId, in, dispatcher);
- break;
- case 0x00140029:
- dispatcher.receiveChannelCloseOk(channelId);
- break;
-
- // ACCESS_CLASS:
-
- case 0x001e000a:
- AccessRequestBody.process(channelId, in, dispatcher);
- break;
- case 0x001e000b:
- AccessRequestOkBody.process(channelId, in, dispatcher);
- break;
-
- // EXCHANGE_CLASS:
-
- case 0x0028000a:
- ExchangeDeclareBody.process(channelId, in, dispatcher);
- break;
- case 0x0028000b:
- dispatcher.receiveExchangeDeclareOk(channelId);
- break;
- case 0x00280014:
- ExchangeDeleteBody.process(channelId, in, dispatcher);
- break;
- case 0x00280015:
- dispatcher.receiveExchangeDeleteOk(channelId);
- break;
- case 0x00280016:
- ExchangeBoundBody.process(channelId, in, dispatcher);
- break;
- case 0x00280017:
- ExchangeBoundOkBody.process(channelId, in, dispatcher);
- break;
-
-
- // QUEUE_CLASS:
-
- case 0x0032000a:
- QueueDeclareBody.process(channelId, in, dispatcher);
- break;
- case 0x0032000b:
- QueueDeclareOkBody.process(channelId, in, dispatcher);
- break;
- case 0x00320014:
- QueueBindBody.process(channelId, in, dispatcher);
- break;
- case 0x00320015:
- dispatcher.receiveQueueBindOk(channelId);
- break;
- case 0x0032001e:
- QueuePurgeBody.process(channelId, in, dispatcher);
- break;
- case 0x0032001f:
- QueuePurgeOkBody.process(channelId, in, dispatcher);
- break;
- case 0x00320028:
- QueueDeleteBody.process(channelId, in, dispatcher);
- break;
- case 0x00320029:
- QueueDeleteOkBody.process(channelId, in, dispatcher);
- break;
- case 0x00320032:
- QueueUnbindBody.process(channelId, in, dispatcher);
- break;
- case 0x00320033:
- dispatcher.receiveQueueUnbindOk(channelId);
- break;
-
-
- // BASIC_CLASS:
-
- case 0x003c000a:
- BasicQosBody.process(channelId, in, dispatcher);
- break;
- case 0x003c000b:
- dispatcher.receiveBasicQosOk(channelId);
- break;
- case 0x003c0014:
- BasicConsumeBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0015:
- BasicConsumeOkBody.process(channelId, in, dispatcher);
- break;
- case 0x003c001e:
- BasicCancelBody.process(channelId, in, dispatcher);
- break;
- case 0x003c001f:
- BasicCancelOkBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0028:
- BasicPublishBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0032:
- BasicReturnBody.process(channelId, in, dispatcher);
- break;
- case 0x003c003c:
- BasicDeliverBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0046:
- BasicGetBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0047:
- BasicGetOkBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0048:
- BasicGetEmptyBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0050:
- BasicAckBody.process(channelId, in, dispatcher);
- break;
- case 0x003c005a:
- BasicRejectBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0064:
- BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
- break;
- case 0x003c0065:
- dispatcher.receiveBasicRecoverSyncOk(channelId);
- break;
- case 0x003c0066:
- BasicRecoverSyncBody.process(channelId, in, dispatcher);
- break;
- case 0x003c006e:
- BasicRecoverSyncBody.process(channelId, in, dispatcher);
- break;
- case 0x003c006f:
- dispatcher.receiveBasicRecoverSyncOk(channelId);
- break;
-
- // TX_CLASS:
-
- case 0x005a000a:
- dispatcher.receiveTxSelect(channelId);
- break;
- case 0x005a000b:
- dispatcher.receiveTxSelectOk(channelId);
- break;
- case 0x005a0014:
- dispatcher.receiveTxCommit(channelId);
- break;
- case 0x005a0015:
- dispatcher.receiveTxCommitOk(channelId);
- break;
- case 0x005a001e:
- dispatcher.receiveTxRollback(channelId);
- break;
- case 0x005a001f:
- dispatcher.receiveTxRollbackOk(channelId);
- break;
-
- default:
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
- dispatcher.getProtocolVersion());
+ case 0x0014000a:
+ ChannelOpenBody.process(channelId, in, dispatcher);
+ break;
+ case 0x0014000b:
+ ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
+ break;
+ case 0x00140014:
+ ChannelFlowBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00140015:
+ ChannelFlowOkBody.process(channelId, in, dispatcher);
+ break;
+ case 0x0014001e:
+ ChannelAlertBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00140028:
+ ChannelCloseBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00140029:
+ dispatcher.receiveChannelCloseOk(channelId);
+ break;
+
+ // ACCESS_CLASS:
+
+ case 0x001e000a:
+ AccessRequestBody.process(channelId, in, dispatcher);
+ break;
+ case 0x001e000b:
+ AccessRequestOkBody.process(channelId, in, dispatcher);
+ break;
+
+ // EXCHANGE_CLASS:
+
+ case 0x0028000a:
+ ExchangeDeclareBody.process(channelId, in, dispatcher);
+ break;
+ case 0x0028000b:
+ dispatcher.receiveExchangeDeclareOk(channelId);
+ break;
+ case 0x00280014:
+ ExchangeDeleteBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00280015:
+ dispatcher.receiveExchangeDeleteOk(channelId);
+ break;
+ case 0x00280016:
+ ExchangeBoundBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00280017:
+ ExchangeBoundOkBody.process(channelId, in, dispatcher);
+ break;
+
+
+ // QUEUE_CLASS:
+
+ case 0x0032000a:
+ QueueDeclareBody.process(channelId, in, dispatcher);
+ break;
+ case 0x0032000b:
+ QueueDeclareOkBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00320014:
+ QueueBindBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00320015:
+ dispatcher.receiveQueueBindOk(channelId);
+ break;
+ case 0x0032001e:
+ QueuePurgeBody.process(channelId, in, dispatcher);
+ break;
+ case 0x0032001f:
+ QueuePurgeOkBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00320028:
+ QueueDeleteBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00320029:
+ QueueDeleteOkBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00320032:
+ QueueUnbindBody.process(channelId, in, dispatcher);
+ break;
+ case 0x00320033:
+ dispatcher.receiveQueueUnbindOk(channelId);
+ break;
+
+
+ // BASIC_CLASS:
+
+ case 0x003c000a:
+ BasicQosBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c000b:
+ dispatcher.receiveBasicQosOk(channelId);
+ break;
+ case 0x003c0014:
+ BasicConsumeBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c0015:
+ BasicConsumeOkBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c001e:
+ BasicCancelBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c001f:
+ BasicCancelOkBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c0028:
+ BasicPublishBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c0032:
+ BasicReturnBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c003c:
+ BasicDeliverBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c0046:
+ BasicGetBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c0047:
+ BasicGetOkBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c0048:
+ BasicGetEmptyBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c0050:
+ BasicAckBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c005a:
+ BasicRejectBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c0064:
+ BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
+ break;
+ case 0x003c0065:
+ dispatcher.receiveBasicRecoverSyncOk(channelId);
+ break;
+ case 0x003c0066:
+ BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c006e:
+ BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ break;
+ case 0x003c006f:
+ dispatcher.receiveBasicRecoverSyncOk(channelId);
+ break;
+
+ // TX_CLASS:
+
+ case 0x005a000a:
+ dispatcher.receiveTxSelect(channelId);
+ break;
+ case 0x005a000b:
+ dispatcher.receiveTxSelectOk(channelId);
+ break;
+ case 0x005a0014:
+ dispatcher.receiveTxCommit(channelId);
+ break;
+ case 0x005a0015:
+ dispatcher.receiveTxCommitOk(channelId);
+ break;
+ case 0x005a001e:
+ dispatcher.receiveTxRollback(channelId);
+ break;
+ case 0x005a001f:
+ dispatcher.receiveTxRollbackOk(channelId);
+ break;
+
+ default:
+ throw newUnknownMethodException(classId, methodId,
+ dispatcher.getProtocolVersion());
+ }
+ }
+ finally
+ {
+ dispatcher.setCurrentMethod(0,0);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
index 869994561f..6b02b066ae 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
@@ -38,6 +38,13 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA
public static final int CLASS_ID = 40;
public static final int METHOD_ID = 23;
+ public static final int OK = 0;
+ public static final int EXCHANGE_NOT_FOUND = 1;
+ public static final int QUEUE_NOT_FOUND = 2;
+ public static final int NO_BINDINGS = 3;
+ public static final int QUEUE_NOT_BOUND = 4;
+ public static final int NO_QUEUE_BOUND_WITH_RK = 5;
+ public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
// Fields declared in specification
private final int _replyCode; // [replyCode]
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
index 348df8b24d..1ad0f3081b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
@@ -28,6 +28,8 @@ public class FrameCreatingMethodProcessor implements MethodProcessor
private ProtocolVersion _protocolVersion;
private final List<AMQDataBlock> _processedMethods = new ArrayList<>();
+ private int _classId;
+ private int _methodId;
public FrameCreatingMethodProcessor(final ProtocolVersion protocolVersion)
{
@@ -522,4 +524,21 @@ public class FrameCreatingMethodProcessor implements MethodProcessor
{
_processedMethods.add(protocolInitiation);
}
+
+ @Override
+ public void setCurrentMethod(final int classId, final int methodId)
+ {
+ _classId = classId;
+ _methodId = methodId;
+ }
+
+ public int getClassId()
+ {
+ return _classId;
+ }
+
+ public int getMethodId()
+ {
+ return _methodId;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
index e995cbf181..0b08059631 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
@@ -198,4 +198,6 @@ public interface MethodProcessor
void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
void receiveProtocolHeader(ProtocolInitiation protocolInitiation);
+
+ void setCurrentMethod(int classId, int methodId);
}