summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-10 09:54:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-10 09:54:36 +0000
commit89b0cfc3fd36bd8679c3f9343289c3dd98f5394f (patch)
treecc287c92cfe5d32ef19d8aaad1238862d385ab25
parent146fcaecb92b7579fa6927dceecfe32ab92378c4 (diff)
downloadqpid-python-89b0cfc3fd36bd8679c3f9343289c3dd98f5394f.tar.gz
More refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1630745 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java42
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java269
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java464
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java230
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java6
-rw-r--r--java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java32
-rw-r--r--java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java23
59 files changed, 760 insertions, 614 deletions
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index d9aa6e2d11..0c8a63eb5b 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -30,8 +30,6 @@ import java.util.Arrays;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
@@ -241,8 +239,6 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
{
- MethodRegistry methodRegistry = new MethodRegistry(ProtocolVersion.v0_9);
- int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
return new ContentHeaderBody(props, length);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 9d92337a62..1d0c0a9b25 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -133,6 +133,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
+ private final FrameCreatingMethodProcessor _methodProcessor = new FrameCreatingMethodProcessor(_protocolVersion);
private final List<Action<? super AMQProtocolEngine>> _taskList =
new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
@@ -185,7 +186,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
- _decoder = new AMQDecoder(true, _methodRegistry);
+ _decoder = new AMQDecoder(true, _methodProcessor);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -296,10 +297,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_readBytes += msg.remaining();
_receivedLock.lock();
+ List<AMQDataBlock> processedMethods = _methodProcessor.getProcessedMethods();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
- for (AMQDataBlock dataBlock : dataBlocks)
+ _decoder.decodeBuffer(msg);
+ for (AMQDataBlock dataBlock : processedMethods)
{
try
{
@@ -320,6 +322,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
break;
}
}
+ processedMethods.clear();
receivedComplete();
}
catch (ConnectionScopedRuntimeException e)
@@ -349,6 +352,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
finally
{
+ processedMethods.clear();
_receivedLock.unlock();
}
return null;
@@ -1089,13 +1093,32 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private void closeConnection(int channelId, AMQConnectionException e)
{
- try
+
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing connection due to: " + e);
- }
+ _logger.info("Closing connection due to: " + e);
+ }
+ closeConnection(channelId, e.getCloseFrame());
+ }
+
+
+ void closeConnection(AMQConstant errorCode,
+ String message, int channelId,
+ int classId,
+ int methodId)
+ {
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + message);
+ }
+ closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), classId, methodId)));
+ }
+ private void closeConnection(int channelId, AMQFrame frame)
+ {
+ try
+ {
markChannelAwaitingCloseOk(channelId);
closeSession();
}
@@ -1103,7 +1126,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
try
{
- writeFrame(e.getCloseFrame());
+ writeFrame(frame);
}
finally
{
@@ -1208,6 +1231,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
_protocolVersion = pv;
_methodRegistry.setProtocolVersion(_protocolVersion);
+ _methodProcessor.setProtocolVersion(_protocolVersion);
_protocolOutputConverter = new ProtocolOutputConverterImpl(this);
_dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 0bf83fe301..695b7c3253 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -23,8 +23,8 @@ package org.apache.qpid.client.protocol;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
@@ -193,7 +193,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_connection = con;
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
- _decoder = new AMQDecoder(false, _protocolSession.getMethodRegistry());
+ _decoder = new AMQDecoder(false, _protocolSession.getMethodProcessor());
_failoverHandler = new FailoverHandler(this);
}
@@ -459,9 +459,10 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_readBytes += msg.remaining();
_lastReadTime = System.currentTimeMillis();
+ final List<AMQDataBlock> dataBlocks = _protocolSession.getMethodProcessor().getProcessedMethods();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
+ _decoder.decodeBuffer(msg);
// Decode buffer
int size = dataBlocks.size();
@@ -511,6 +512,10 @@ public class AMQProtocolHandler implements ProtocolEngine
propagateExceptionToFrameListeners(e);
exception(e);
}
+ finally
+ {
+ dataBlocks.clear();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 8a16c1c8a5..2fbb13079e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -44,6 +44,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
@@ -91,6 +92,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private final MethodRegistry _methodRegistry =
new MethodRegistry(ProtocolVersion.getLatestSupportedVersion());
+ private final FrameCreatingMethodProcessor _methodProcessor =
+ new FrameCreatingMethodProcessor(ProtocolVersion.getLatestSupportedVersion());
+
private MethodDispatcher _methodDispatcher;
private final AMQConnection _connection;
@@ -416,7 +420,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_logger.debug("Setting ProtocolVersion to :" + pv);
}
_protocolVersion = pv;
- _methodRegistry.setProtocolVersion(pv);;
+ _methodRegistry.setProtocolVersion(pv);
+ _methodProcessor.setProtocolVersion(pv);
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
}
@@ -549,4 +554,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolHandler.setMaxFrameSize(frameMax);
}
+
+ public FrameCreatingMethodProcessor getMethodProcessor()
+ {
+ return _methodProcessor;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index 5663a2c58c..b7904303b5 100644
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -30,14 +30,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
-import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ByteArrayDataInput;
import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.MethodProcessor;
import org.apache.qpid.framing.ProtocolInitiation;
/**
@@ -54,7 +53,8 @@ import org.apache.qpid.framing.ProtocolInitiation;
*/
public class AMQDecoder
{
- private final MethodRegistry _registry;
+ private final MethodProcessor _methodProcessor;
+
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -73,12 +73,12 @@ public class AMQDecoder
* Creates a new AMQP decoder.
*
* @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
- * @param registry method registry
+ * @param methodProcessor method processor
*/
- public AMQDecoder(boolean expectProtocolInitiation, MethodRegistry registry)
+ public AMQDecoder(boolean expectProtocolInitiation, MethodProcessor methodProcessor)
{
_expectProtocolInitiation = expectProtocolInitiation;
- _registry = registry;
+ _methodProcessor = methodProcessor;
}
@@ -217,14 +217,13 @@ public class AMQDecoder
}
- public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+ public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
- // get prior remaining data from accumulator
- ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
MarkableDataInput msg;
+ // get prior remaining data from accumulator
ByteArrayInputStream bais;
DataInput di;
if(!_remainingBufs.isEmpty())
@@ -258,9 +257,7 @@ public class AMQDecoder
enoughData = _dataBlockDecoder.decodable(msg);
if (enoughData)
{
- dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_registry.getProtocolVersion(),
- _registry.getMethodProcessor(),
- msg));
+ _dataBlockDecoder.processInput(_methodProcessor, msg);
}
}
else
@@ -268,7 +265,7 @@ public class AMQDecoder
enoughData = _piDecoder.decodable(msg);
if (enoughData)
{
- dataBlocks.add(new ProtocolInitiation(msg));
+ _methodProcessor.receiveProtocolHeader(new ProtocolInitiation(msg));
}
}
@@ -305,6 +302,5 @@ public class AMQDecoder
}
}
}
- return dataBlocks;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 0a0a570bc3..a05e3db139 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -35,7 +35,8 @@ public class AMQDataBlockDecoder
private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
public AMQDataBlockDecoder()
- { }
+ {
+ }
public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException
{
@@ -52,9 +53,13 @@ public class AMQDataBlockDecoder
// Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
final long bodySize = in.readInt() & 0xffffffffL;
- if(bodySize > _maxFrameSize)
+ if (bodySize > _maxFrameSize)
{
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Incoming frame size of "+bodySize+" is larger than negotiated maximum of " + _maxFrameSize);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "Incoming frame size of "
+ + bodySize
+ + " is larger than negotiated maximum of "
+ + _maxFrameSize);
}
in.reset();
@@ -62,9 +67,8 @@ public class AMQDataBlockDecoder
}
- public <T> T createAndPopulateFrame(ProtocolVersion pv,
- MethodProcessor<T> processor,
- MarkableDataInput in)
+ public void processInput(MethodProcessor processor,
+ MarkableDataInput in)
throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
final byte type = in.readByte();
@@ -75,24 +79,24 @@ public class AMQDataBlockDecoder
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
{
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Undecodable frame: type = " + type + " channel = " + channel
- + " bodySize = " + bodySize);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize);
}
- T result;
- switch(type)
+ switch (type)
{
case 1:
- result = processMethod(channel, in, processor, pv);
+ processMethod(channel, in, processor);
break;
case 2:
- result = ContentHeaderBody.process(channel, in, processor, bodySize);
+ ContentHeaderBody.process(channel, in, processor, bodySize);
break;
case 3:
- result = ContentBody.process(channel, in, processor, bodySize);
+ ContentBody.process(channel, in, processor, bodySize);
break;
case 8:
- result = HeartbeatBody.process(channel, in, processor, bodySize);
+ HeartbeatBody.process(channel, in, processor, bodySize);
break;
default:
throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type);
@@ -101,11 +105,11 @@ public class AMQDataBlockDecoder
byte marker = in.readByte();
if ((marker & 0xFF) != 0xCE)
{
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "End of frame marker not found. Read " + marker + " length=" + bodySize
- + " type=" + type);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type);
}
- return result;
}
public void setMaxFrameSize(final int maxFrameSize)
@@ -113,200 +117,277 @@ public class AMQDataBlockDecoder
_maxFrameSize = maxFrameSize;
}
- private <T> T processMethod(int channelId, MarkableDataInput in, MethodProcessor<T> dispatcher, ProtocolVersion protocolVersion)
+ private void processMethod(int channelId,
+ MarkableDataInput in,
+ MethodProcessor dispatcher)
throws AMQFrameDecodingException, IOException
{
final int classAndMethod = in.readInt();
-
switch (classAndMethod)
{
//CONNECTION_CLASS:
case 0x000a000a:
- return ConnectionStartBody.process(in, dispatcher);
+ ConnectionStartBody.process(in, dispatcher);
+ break;
case 0x000a000b:
- return ConnectionStartOkBody.process(in, dispatcher);
+ ConnectionStartOkBody.process(in, dispatcher);
+ break;
case 0x000a0014:
- return ConnectionSecureBody.process(in, dispatcher);
+ ConnectionSecureBody.process(in, dispatcher);
+ break;
case 0x000a0015:
- return ConnectionSecureOkBody.process(in, dispatcher);
+ ConnectionSecureOkBody.process(in, dispatcher);
+ break;
case 0x000a001e:
- return ConnectionTuneBody.process(in, dispatcher);
+ ConnectionTuneBody.process(in, dispatcher);
+ break;
case 0x000a001f:
- return ConnectionTuneOkBody.process(in, dispatcher);
+ ConnectionTuneOkBody.process(in, dispatcher);
+ break;
case 0x000a0028:
- return ConnectionOpenBody.process(in, dispatcher);
+ ConnectionOpenBody.process(in, dispatcher);
+ break;
case 0x000a0029:
- return ConnectionOpenOkBody.process(in, dispatcher);
+ ConnectionOpenOkBody.process(in, dispatcher);
+ break;
case 0x000a002a:
- return ConnectionRedirectBody.process(in, dispatcher);
+ ConnectionRedirectBody.process(in, dispatcher);
+ break;
case 0x000a0032:
- if (protocolVersion.equals(ProtocolVersion.v8_0))
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- return ConnectionRedirectBody.process(in, dispatcher);
+ ConnectionRedirectBody.process(in, dispatcher);
}
else
{
- return ConnectionCloseBody.process(in, dispatcher);
+ ConnectionCloseBody.process(in, dispatcher);
}
+ break;
case 0x000a0033:
- if (protocolVersion.equals(ProtocolVersion.v8_0))
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
+ dispatcher.getProtocolVersion());
}
else
{
- return dispatcher.connectionCloseOk();
+ dispatcher.receiveConnectionCloseOk();
}
+ break;
case 0x000a003c:
- if (protocolVersion.equals(ProtocolVersion.v8_0))
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- return ConnectionCloseBody.process(in, dispatcher);
+ ConnectionCloseBody.process(in, dispatcher);
}
else
{
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
+ dispatcher.getProtocolVersion());
}
+ break;
case 0x000a003d:
- if (protocolVersion.equals(ProtocolVersion.v8_0))
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- return dispatcher.connectionCloseOk();
+ dispatcher.receiveConnectionCloseOk();
}
else
{
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
+ dispatcher.getProtocolVersion());
}
+ break;
// CHANNEL_CLASS:
case 0x0014000a:
- return ChannelOpenBody.process(channelId, in, dispatcher);
+ ChannelOpenBody.process(channelId, in, dispatcher);
+ break;
case 0x0014000b:
- return ChannelOpenOkBody.process(channelId, in, protocolVersion, dispatcher);
+ ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
+ break;
case 0x00140014:
- return ChannelFlowBody.process(channelId, in, dispatcher);
+ ChannelFlowBody.process(channelId, in, dispatcher);
+ break;
case 0x00140015:
- return ChannelFlowOkBody.process(channelId, in, dispatcher);
+ ChannelFlowOkBody.process(channelId, in, dispatcher);
+ break;
case 0x0014001e:
- return ChannelAlertBody.process(channelId, in, dispatcher);
+ ChannelAlertBody.process(channelId, in, dispatcher);
+ break;
case 0x00140028:
- return ChannelCloseBody.process(channelId, in, dispatcher);
+ ChannelCloseBody.process(channelId, in, dispatcher);
+ break;
case 0x00140029:
- return dispatcher.channelCloseOk(channelId);
+ dispatcher.receiveChannelCloseOk(channelId);
+ break;
// ACCESS_CLASS:
case 0x001e000a:
- return AccessRequestBody.process(channelId, in, dispatcher);
+ AccessRequestBody.process(channelId, in, dispatcher);
+ break;
case 0x001e000b:
- return AccessRequestOkBody.process(channelId, in, dispatcher);
+ AccessRequestOkBody.process(channelId, in, dispatcher);
+ break;
// EXCHANGE_CLASS:
case 0x0028000a:
- return ExchangeDeclareBody.process(channelId, in, dispatcher);
+ ExchangeDeclareBody.process(channelId, in, dispatcher);
+ break;
case 0x0028000b:
- return dispatcher.exchangeDeclareOk(channelId);
+ dispatcher.receiveExchangeDeclareOk(channelId);
+ break;
case 0x00280014:
- return ExchangeDeleteBody.process(channelId, in, dispatcher);
+ ExchangeDeleteBody.process(channelId, in, dispatcher);
+ break;
case 0x00280015:
- return dispatcher.exchangeDeleteOk(channelId);
+ dispatcher.receiveExchangeDeleteOk(channelId);
+ break;
case 0x00280016:
- return ExchangeBoundBody.process(channelId, in, dispatcher);
+ ExchangeBoundBody.process(channelId, in, dispatcher);
+ break;
case 0x00280017:
- return ExchangeBoundOkBody.process(channelId, in, dispatcher);
+ ExchangeBoundOkBody.process(channelId, in, dispatcher);
+ break;
// QUEUE_CLASS:
case 0x0032000a:
- return QueueDeclareBody.process(channelId, in, dispatcher);
+ QueueDeclareBody.process(channelId, in, dispatcher);
+ break;
case 0x0032000b:
- return QueueDeclareOkBody.process(channelId, in, dispatcher);
+ QueueDeclareOkBody.process(channelId, in, dispatcher);
+ break;
case 0x00320014:
- return QueueBindBody.process(channelId, in, dispatcher);
+ QueueBindBody.process(channelId, in, dispatcher);
+ break;
case 0x00320015:
- return dispatcher.queueBindOk(channelId);
+ dispatcher.receiveQueueBindOk(channelId);
+ break;
case 0x0032001e:
- return QueuePurgeBody.process(channelId, in, dispatcher);
+ QueuePurgeBody.process(channelId, in, dispatcher);
+ break;
case 0x0032001f:
- return QueuePurgeOkBody.process(channelId, in, dispatcher);
+ QueuePurgeOkBody.process(channelId, in, dispatcher);
+ break;
case 0x00320028:
- return QueueDeleteBody.process(channelId, in, dispatcher);
+ QueueDeleteBody.process(channelId, in, dispatcher);
+ break;
case 0x00320029:
- return QueueDeleteOkBody.process(channelId, in, dispatcher);
+ QueueDeleteOkBody.process(channelId, in, dispatcher);
+ break;
case 0x00320032:
- return QueueUnbindBody.process(channelId, in, dispatcher);
+ QueueUnbindBody.process(channelId, in, dispatcher);
+ break;
case 0x00320033:
- return dispatcher.queueUnbindOk(channelId);
+ dispatcher.receiveQueueUnbindOk(channelId);
+ break;
// BASIC_CLASS:
case 0x003c000a:
- return BasicQosBody.process(channelId, in, dispatcher);
+ BasicQosBody.process(channelId, in, dispatcher);
+ break;
case 0x003c000b:
- return dispatcher.basicQosOk(channelId);
+ dispatcher.receiveBasicQosOk(channelId);
+ break;
case 0x003c0014:
- return BasicConsumeBody.process(channelId, in, dispatcher);
+ BasicConsumeBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0015:
- return BasicConsumeOkBody.process(channelId, in, dispatcher);
+ BasicConsumeOkBody.process(channelId, in, dispatcher);
+ break;
case 0x003c001e:
- return BasicCancelBody.process(channelId, in, dispatcher);
+ BasicCancelBody.process(channelId, in, dispatcher);
+ break;
case 0x003c001f:
- return BasicCancelOkBody.process(channelId, in, dispatcher);
+ BasicCancelOkBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0028:
- return BasicPublishBody.process(channelId, in, dispatcher);
+ BasicPublishBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0032:
- return BasicReturnBody.process(channelId, in, dispatcher);
+ BasicReturnBody.process(channelId, in, dispatcher);
+ break;
case 0x003c003c:
- return BasicDeliverBody.process(channelId, in, dispatcher);
+ BasicDeliverBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0046:
- return BasicGetBody.process(channelId, in, dispatcher);
+ BasicGetBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0047:
- return BasicGetOkBody.process(channelId, in, dispatcher);
+ BasicGetOkBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0048:
- return BasicGetEmptyBody.process(channelId, in, dispatcher);
+ BasicGetEmptyBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0050:
- return BasicAckBody.process(channelId, in, dispatcher);
+ BasicAckBody.process(channelId, in, dispatcher);
+ break;
case 0x003c005a:
- return BasicRejectBody.process(channelId, in, dispatcher);
+ BasicRejectBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0064:
- return BasicRecoverBody.process(channelId, in, protocolVersion, dispatcher);
+ BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
+ break;
case 0x003c0065:
- return dispatcher.basicRecoverSyncOk(channelId);
+ dispatcher.receiveBasicRecoverSyncOk(channelId);
+ break;
case 0x003c0066:
- return BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ break;
case 0x003c006e:
- return BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ break;
case 0x003c006f:
- return dispatcher.basicRecoverSyncOk(channelId);
+ dispatcher.receiveBasicRecoverSyncOk(channelId);
+ break;
// TX_CLASS:
case 0x005a000a:
- return dispatcher.txSelect(channelId);
+ dispatcher.receiveTxSelect(channelId);
+ break;
case 0x005a000b:
- return dispatcher.txSelectOk(channelId);
+ dispatcher.receiveTxSelectOk(channelId);
+ break;
case 0x005a0014:
- return dispatcher.txCommit(channelId);
+ dispatcher.receiveTxCommit(channelId);
+ break;
case 0x005a0015:
- return dispatcher.txCommitOk(channelId);
+ dispatcher.receiveTxCommitOk(channelId);
+ break;
case 0x005a001e:
- return dispatcher.txRollback(channelId);
+ dispatcher.receiveTxRollback(channelId);
+ break;
case 0x005a001f:
- return dispatcher.txRollbackOk(channelId);
+ dispatcher.receiveTxRollbackOk(channelId);
+ break;
default:
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
+ dispatcher.getProtocolVersion());
}
}
- private AMQFrameDecodingException newUnknownMethodException(final int classId, final int methodId, ProtocolVersion protocolVersion)
+ private AMQFrameDecodingException newUnknownMethodException(final int classId,
+ final int methodId,
+ ProtocolVersion protocolVersion)
{
return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID,
- "Method " + methodId + " unknown in AMQP version " + protocolVersion
- + " (while trying to decode class " + classId + " method " + methodId + ".");
+ "Method "
+ + methodId
+ + " unknown in AMQP version "
+ + protocolVersion
+ + " (while trying to decode class "
+ + classId
+ + " method "
+ + methodId
+ + ".");
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
index 9a386d4eb4..ce2a5a1317 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
@@ -165,9 +165,9 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString realm = buffer.readAMQShortString();
byte bitfield = buffer.readByte();
@@ -176,6 +176,6 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ
boolean active = (bitfield & 0x04) == 0x4 ;
boolean write = (bitfield & 0x08) == 0x8 ;
boolean read = (bitfield & 0x10) == 0x10 ;
- return dispatcher.accessRequest(channelId, realm, exclusive, passive, active, write, read);
+ dispatcher.receiveAccessRequest(channelId, realm, exclusive, passive, active, write, read);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
index 8439df9e92..10be4d45c8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
@@ -95,10 +95,10 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
int ticket = buffer.readUnsignedShort();
- return dispatcher.accessRequestOk(channelId, ticket);
+ dispatcher.receiveAccessRequestOk(channelId, ticket);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
index 956d10bf0a..70e3f10148 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
@@ -112,13 +112,13 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
boolean multiple = (buffer.readByte() & 0x01) != 0;
- return dispatcher.basicAck(channelId, deliveryTag, multiple);
+ dispatcher.receiveBasicAck(channelId, deliveryTag, multiple);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
index 1c619fd5d4..6f74b3870a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
@@ -113,13 +113,13 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
boolean noWait = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.basicCancel(channelId, consumerTag, noWait);
+ dispatcher.receiveBasicCancel(channelId, consumerTag, noWait);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
index c85223cd4f..0e9bc52d66 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
@@ -96,10 +96,10 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput in, final MethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = in.readAMQShortString();
- return dispatcher.basicCancelOk(channelId, consumerTag);
+ dispatcher.receiveBasicCancelOk(channelId, consumerTag);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
index 1d6ec46c9a..94396418fe 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
@@ -191,7 +191,7 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
@@ -205,6 +205,6 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD
boolean exclusive = (bitfield & 0x04) == 0x04;
boolean nowait = (bitfield & 0x08) == 0x08;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.basicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments);
+ dispatcher.receiveBasicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
index b019574a6b..d42c722fdf 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
@@ -96,10 +96,10 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
- return dispatcher.basicConsumeOk(channelId, consumerTag);
+ dispatcher.receiveBasicConsumeOk(channelId, consumerTag);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
index 76cd9bfff4..afa38d1852 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
@@ -152,9 +152,9 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
@@ -162,6 +162,6 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD
boolean redelivered = (buffer.readByte() & 0x01) != 0;
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
- return dispatcher.basicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey);
+ dispatcher.receiveBasicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
index 2ebde34648..93429b97d8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
@@ -125,13 +125,13 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
int ticket = buffer.readUnsignedShort();
AMQShortString queue = buffer.readAMQShortString();
boolean noAck = (buffer.readByte() & 0x01) != 0;
- return dispatcher.basicGet(channelId, queue, noAck);
+ dispatcher.receiveBasicGet(channelId, queue, noAck);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
index 508c3f8e66..a42df6bcc7 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
@@ -96,11 +96,11 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString clusterId = buffer.readAMQShortString();
- return dispatcher.basicGetEmpty(channelId);
+ dispatcher.receiveBasicGetEmpty(channelId);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
index 4020d8fb23..b8af656a35 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
@@ -151,15 +151,15 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
boolean redelivered = (buffer.readByte() & 0x01) != 0;
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
- return dispatcher.basicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount);
+ dispatcher.receiveBasicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
index 7920da8405..910942c2f1 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
@@ -151,9 +151,9 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int ticket = buffer.readUnsignedShort();
@@ -163,6 +163,6 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD
boolean mandatory = (bitfield & 0x01) != 0;
boolean immediate = (bitfield & 0x02) != 0;
- return dispatcher.basicPublish(channelId, exchange, routingKey, mandatory, immediate);
+ dispatcher.receiveBasicPublish(channelId, exchange, routingKey, mandatory, immediate);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
index 0843c5ccd7..fb6b6956c6 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
@@ -124,14 +124,14 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long prefetchSize = EncodingUtils.readUnsignedInteger(buffer);
int prefetchCount = buffer.readUnsignedShort();
boolean global = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.basicQos(channelId, prefetchSize, prefetchCount, global);
+ dispatcher.receiveBasicQos(channelId, prefetchSize, prefetchCount, global);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
index 739470c658..2519f25fbe 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
@@ -100,14 +100,14 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput in,
final ProtocolVersion protocolVersion,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
boolean requeue = (in.readByte() & 0x01) == 0x01;
boolean sync = (ProtocolVersion.v8_0.equals(protocolVersion));
- return dispatcher.basicRecover(channelId, requeue, sync);
+ dispatcher.receiveBasicRecover(channelId, requeue, sync);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
index 5826bd9d16..16c9798977 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
@@ -103,11 +103,11 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput in,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
boolean requeue = (in.readByte() & 0x01) == 0x01;
- return dispatcher.basicRecover(channelId, requeue, true);
+ dispatcher.receiveBasicRecover(channelId, requeue, true);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
index 83f2727a51..8e1ebf4013 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
@@ -112,13 +112,13 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
boolean requeue = (buffer.readByte() & 0x01) != 0;
- return dispatcher.basicReject(channelId, deliveryTag, requeue);
+ dispatcher.receiveBasicReject(channelId, deliveryTag, requeue);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
index 67d6c77312..cff9914705 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
@@ -134,15 +134,15 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
- return dispatcher.basicReturn(channelId, replyCode, replyText, exchange, routingKey);
+ dispatcher.receiveBasicReturn(channelId, replyCode, replyText, exchange, routingKey);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
index d5c535b099..11dcffc175 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
@@ -121,13 +121,13 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
FieldTable details = EncodingUtils.readFieldTable(buffer);
- return dispatcher.channelAlert(channelId, replyCode, replyText, details);
+ dispatcher.receiveChannelAlert(channelId, replyCode, replyText, details);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
index ea1536ed2b..a4f54fbe7d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
@@ -132,15 +132,15 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
int classId = buffer.readUnsignedShort();
int methodId = buffer.readUnsignedShort();
- return dispatcher.channelClose(channelId, replyCode, replyText, classId, methodId);
+ dispatcher.receiveChannelClose(channelId, replyCode, replyText, classId, methodId);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
index e70eb0ea35..c975744d9f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
@@ -92,11 +92,11 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
boolean active = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.channelFlow(channelId, active);
+ dispatcher.receiveChannelFlow(channelId, active);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
index 13bdf332d2..a62c6155f8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
@@ -93,10 +93,10 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
boolean active = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.channelFlowOk(channelId, active);
+ dispatcher.receiveChannelFlowOk(channelId, active);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
index f96eb9344b..9da45d3d70 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
@@ -82,11 +82,11 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa
return "[ChannelOpenBody] ";
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
buffer.readAMQShortString();
- return dispatcher.channelOpen(channelId);
+ dispatcher.receiveChannelOpen(channelId);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
index 5cf4d91970..775a08fbd4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
@@ -96,16 +96,16 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return "[ChannelOpenOkBody]";
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput in,
final ProtocolVersion protocolVersion,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
if(!ProtocolVersion.v8_0.equals(protocolVersion))
{
EncodingUtils.readBytes(in);
}
- return dispatcher.channelOpenOk(channelId);
+ dispatcher.receiveChannelOpenOk(channelId);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
index 02f214cee9..546cf5fa0a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
@@ -134,12 +134,12 @@ public class ConnectionCloseBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
int classId = buffer.readUnsignedShort();
int methodId = buffer.readUnsignedShort();
- return dispatcher.connectionClose(replyCode, replyText, classId, methodId);
+ dispatcher.receiveConnectionClose(replyCode, replyText, classId, methodId);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
index f9f55446dd..0e685deb7c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
@@ -121,12 +121,12 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
AMQShortString virtualHost = buffer.readAMQShortString();
AMQShortString capabilities = buffer.readAMQShortString();
boolean insist = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.connectionOpen(virtualHost, capabilities, insist);
+ dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
index 3f04da7a29..6d1e80c624 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
@@ -96,10 +96,10 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
AMQShortString knownHosts = buffer.readAMQShortString();
- return dispatcher.connectionOpenOk(knownHosts);
+ dispatcher.receiveConnectionOpenOk(knownHosts);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
index 80c655683f..a9b9a43b1a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
@@ -108,10 +108,10 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
AMQShortString host = buffer.readAMQShortString();
AMQShortString knownHosts = buffer.readAMQShortString();
- return dispatcher.connectionRedirect(host, knownHosts);
+ dispatcher.receiveConnectionRedirect(host, knownHosts);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
index ca208d5a89..1f7f2b0221 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
@@ -96,11 +96,11 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ public static void process(final MarkableDataInput in, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
byte[] challenge = EncodingUtils.readBytes(in);
- return dispatcher.connectionSecure(challenge);
+ dispatcher.receiveConnectionSecure(challenge);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
index 0a2bfa613e..9a4668a9c7 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
@@ -96,9 +96,9 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab
return buf.toString();
}
- public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException
{
byte[] response = EncodingUtils.readBytes(in);
- return dispatcher.connectionSecureOk(response);
+ dispatcher.receiveConnectionSecureOk(response);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
index 17a568d737..4f47f0632f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
@@ -136,7 +136,7 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ public static void process(final MarkableDataInput in, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
short versionMajor = (short) in.readUnsignedByte();
@@ -146,6 +146,6 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA
byte[] locales = EncodingUtils.readBytes(in);
- return dispatcher.connectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales);
+ dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
index ba8182e569..da3d0a2c56 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
@@ -126,7 +126,7 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl
return buf.toString();
}
- public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ public static void process(final MarkableDataInput in, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
@@ -135,6 +135,6 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl
byte[] response = EncodingUtils.readBytes(in);
AMQShortString locale = in.readAMQShortString();
- return dispatcher.connectionStartOk(clientProperties, mechanism, response, locale);
+ dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
index 2ca8e57e18..3383fd889a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
@@ -119,12 +119,12 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
int channelMax = buffer.readUnsignedShort();
long frameMax = EncodingUtils.readUnsignedInteger(buffer);
int heartbeat = buffer.readUnsignedShort();
- return dispatcher.connectionTune(channelMax, frameMax, heartbeat);
+ dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
index 7a259b6419..f695eda2c4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
@@ -119,12 +119,12 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
int channelMax = buffer.readUnsignedShort();
long frameMax = EncodingUtils.readUnsignedInteger(buffer);
int heartbeat = buffer.readUnsignedShort();
- return dispatcher.connectionTuneOk(channelMax, frameMax, heartbeat);
+ dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index dc345a6cc6..01beb3af77 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -92,14 +92,14 @@ public class ContentBody implements AMQBody
return _payload;
}
- public static <T> T process(final int channel,
+ public static void process(final int channel,
final MarkableDataInput in,
- final MethodProcessor<T> methodProcessor, final long bodySize)
+ final MethodProcessor methodProcessor, final long bodySize)
throws IOException
{
byte[] payload = new byte[(int)bodySize];
in.readFully(payload);
- return methodProcessor.messageContent(channel, payload);
+ methodProcessor.receiveMessageContent(channel, payload);
}
private static class BufferContentBody implements AMQBody
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 081b4bdfee..0d54e09ae5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -155,9 +155,9 @@ public class ContentHeaderBody implements AMQBody
_bodySize = bodySize;
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> methodProcessor, final long size)
+ final MethodProcessor methodProcessor, final long size)
throws IOException, AMQFrameDecodingException
{
@@ -175,6 +175,6 @@ public class ContentHeaderBody implements AMQBody
properties = new BasicContentHeaderProperties();
properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14));
- return methodProcessor.messageHeader(channelId, properties, bodySize);
+ methodProcessor.receiveMessageHeader(channelId, properties, bodySize);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
index 8244768fb6..7548db6e93 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
@@ -122,13 +122,13 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
AMQShortString queue = buffer.readAMQShortString();
- return dispatcher.exchangeBound(channelId, exchange, routingKey, queue);
+ dispatcher.receiveExchangeBound(channelId, exchange, routingKey, queue);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
index 2d89a9f467..869994561f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
@@ -108,12 +108,12 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
- return dispatcher.exchangeBoundOk(channelId, replyCode, replyText);
+ dispatcher.receiveExchangeBoundOk(channelId, replyCode, replyText);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
index f96e6d382e..06e590f8e5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
@@ -204,9 +204,9 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -219,6 +219,14 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA
boolean internal = (bitfield & 0x8) == 0x8;
boolean nowait = (bitfield & 0x10) == 0x10;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.exchangeDeclare(channelId, exchange, type, passive, durable, autoDelete, internal, nowait, arguments);
+ dispatcher.receiveExchangeDeclare(channelId,
+ exchange,
+ type,
+ passive,
+ durable,
+ autoDelete,
+ internal,
+ nowait,
+ arguments);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
index 771fa63063..4a30e25502 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
@@ -138,7 +138,7 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
@@ -147,6 +147,6 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM
byte bitfield = buffer.readByte();
boolean ifUnused = (bitfield & 0x01) == 0x01;
boolean nowait = (bitfield & 0x02) == 0x02;
- return dispatcher.exchangeDelete(channelId, exchange, ifUnused, nowait);
+ dispatcher.receiveExchangeDelete(channelId, exchange, ifUnused, nowait);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
index c8b7d2639d..348df8b24d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
@@ -20,484 +20,506 @@
*/
package org.apache.qpid.framing;
-public class FrameCreatingMethodProcessor implements MethodProcessor<AMQFrame>
+import java.util.ArrayList;
+import java.util.List;
+
+public class FrameCreatingMethodProcessor implements MethodProcessor
{
- private final MethodRegistry _methodRegistry;
+ private ProtocolVersion _protocolVersion;
+
+ private final List<AMQDataBlock> _processedMethods = new ArrayList<>();
- FrameCreatingMethodProcessor(final MethodRegistry methodRegistry)
+ public FrameCreatingMethodProcessor(final ProtocolVersion protocolVersion)
{
- _methodRegistry = methodRegistry;
+ _protocolVersion = protocolVersion;
}
+ public List<AMQDataBlock> getProcessedMethods()
+ {
+ return _processedMethods;
+ }
+
@Override
- public AMQFrame connectionStart(final short versionMajor,
- final short versionMinor,
- final FieldTable serverProperties,
- final byte[] mechanisms,
- final byte[] locales)
+ public void receiveConnectionStart(final short versionMajor,
+ final short versionMinor,
+ final FieldTable serverProperties,
+ final byte[] mechanisms,
+ final byte[] locales)
{
- return new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales));
+ _processedMethods.add(new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales)));
}
@Override
- public AMQFrame connectionStartOk(final FieldTable clientProperties,
- final AMQShortString mechanism,
- final byte[] response,
- final AMQShortString locale)
+ public void receiveConnectionStartOk(final FieldTable clientProperties,
+ final AMQShortString mechanism,
+ final byte[] response,
+ final AMQShortString locale)
{
- return new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale));
+ _processedMethods.add(new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale)));
}
@Override
- public AMQFrame txSelect(final int channelId)
+ public void receiveTxSelect(final int channelId)
{
- return new AMQFrame(channelId, TxSelectBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE));
}
@Override
- public AMQFrame txSelectOk(final int channelId)
+ public void receiveTxSelectOk(final int channelId)
{
- return new AMQFrame(channelId, TxSelectOkBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE));
}
@Override
- public AMQFrame txCommit(final int channelId)
+ public void receiveTxCommit(final int channelId)
{
- return new AMQFrame(channelId, TxCommitBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE));
}
@Override
- public AMQFrame txCommitOk(final int channelId)
+ public void receiveTxCommitOk(final int channelId)
{
- return new AMQFrame(channelId, TxCommitOkBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE));
}
@Override
- public AMQFrame txRollback(final int channelId)
+ public void receiveTxRollback(final int channelId)
{
- return new AMQFrame(channelId, TxRollbackBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE));
}
@Override
- public AMQFrame txRollbackOk(final int channelId)
+ public void receiveTxRollbackOk(final int channelId)
{
- return new AMQFrame(channelId, TxRollbackOkBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE));
}
@Override
- public AMQFrame connectionSecure(final byte[] challenge)
+ public void receiveConnectionSecure(final byte[] challenge)
{
- return new AMQFrame(0, new ConnectionSecureBody(challenge));
+ _processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge)));
}
@Override
- public AMQFrame connectionSecureOk(final byte[] response)
+ public void receiveConnectionSecureOk(final byte[] response)
{
- return new AMQFrame(0, new ConnectionSecureOkBody(response));
+ _processedMethods.add(new AMQFrame(0, new ConnectionSecureOkBody(response)));
}
@Override
- public AMQFrame connectionTune(final int channelMax, final long frameMax, final int heartbeat)
+ public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat)
{
- return new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat));
+ _processedMethods.add(new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat)));
}
@Override
- public AMQFrame connectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
+ public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
{
- return new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat));
+ _processedMethods.add(new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat)));
}
@Override
- public AMQFrame connectionOpen(final AMQShortString virtualHost,
- final AMQShortString capabilities,
- final boolean insist)
+ public void receiveConnectionOpen(final AMQShortString virtualHost,
+ final AMQShortString capabilities,
+ final boolean insist)
{
- return new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist));
+ _processedMethods.add(new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist)));
}
@Override
- public AMQFrame connectionOpenOk(final AMQShortString knownHosts)
+ public void receiveConnectionOpenOk(final AMQShortString knownHosts)
{
- return new AMQFrame(0, new ConnectionOpenOkBody(knownHosts));
+ _processedMethods.add(new AMQFrame(0, new ConnectionOpenOkBody(knownHosts)));
}
@Override
- public AMQFrame connectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
+ public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
{
- return new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts));
+ _processedMethods.add(new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts)));
}
@Override
- public AMQFrame connectionClose(final int replyCode,
- final AMQShortString replyText,
- final int classId,
- final int methodId)
+ public void receiveConnectionClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
{
- return new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId));
+ _processedMethods.add(new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId)));
}
@Override
- public AMQFrame connectionCloseOk()
+ public void receiveConnectionCloseOk()
{
- return new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion())
+ _processedMethods.add(new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion())
? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8
- : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9);
+ : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9));
}
@Override
- public AMQFrame channelOpen(final int channelId)
+ public void receiveChannelOpen(final int channelId)
{
- return new AMQFrame(channelId, new ChannelOpenBody());
+ _processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody()));
}
@Override
- public AMQFrame channelOpenOk(final int channelId)
+ public void receiveChannelOpenOk(final int channelId)
{
- return new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
+ _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
? ChannelOpenOkBody.INSTANCE_0_8
- : ChannelOpenOkBody.INSTANCE_0_9);
+ : ChannelOpenOkBody.INSTANCE_0_9));
}
@Override
- public AMQFrame channelFlow(final int channelId, final boolean active)
+ public void receiveChannelFlow(final int channelId, final boolean active)
{
- return new AMQFrame(channelId, new ChannelFlowBody(active));
+ _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active)));
}
@Override
- public AMQFrame channelFlowOk(final int channelId, final boolean active)
+ public void receiveChannelFlowOk(final int channelId, final boolean active)
{
- return new AMQFrame(channelId, new ChannelFlowOkBody(active));
+ _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active)));
}
@Override
- public AMQFrame channelAlert(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final FieldTable details)
+ public void receiveChannelAlert(final int channelId,
+ final int replyCode,
+ final AMQShortString replyText,
+ final FieldTable details)
{
- return new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details));
+ _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)));
}
@Override
- public AMQFrame channelClose(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final int classId,
- final int methodId)
+ public void receiveChannelClose(final int channelId,
+ final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
{
- return new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId));
+ _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)));
}
@Override
- public AMQFrame channelCloseOk(final int channelId)
+ public void receiveChannelCloseOk(final int channelId)
{
- return new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE));
}
@Override
- public AMQFrame accessRequest(final int channelId,
- final AMQShortString realm,
- final boolean exclusive,
- final boolean passive,
- final boolean active,
- final boolean write,
- final boolean read)
+ public void receiveAccessRequest(final int channelId,
+ final AMQShortString realm,
+ final boolean exclusive,
+ final boolean passive,
+ final boolean active,
+ final boolean write,
+ final boolean read)
{
- return new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read));
+ _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)));
}
@Override
- public AMQFrame accessRequestOk(final int channelId, final int ticket)
+ public void receiveAccessRequestOk(final int channelId, final int ticket)
{
- return new AMQFrame(channelId, new AccessRequestOkBody(ticket));
+ _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket)));
}
@Override
- public AMQFrame exchangeDeclare(final int channelId,
- final AMQShortString exchange,
- final AMQShortString type,
- final boolean passive,
- final boolean durable,
- final boolean autoDelete,
- final boolean internal,
- final boolean nowait, final FieldTable arguments)
+ public void receiveExchangeDeclare(final int channelId,
+ final AMQShortString exchange,
+ final AMQShortString type,
+ final boolean passive,
+ final boolean durable,
+ final boolean autoDelete,
+ final boolean internal,
+ final boolean nowait, final FieldTable arguments)
{
- return new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments)));
}
@Override
- public AMQFrame exchangeDeclareOk(final int channelId)
+ public void receiveExchangeDeclareOk(final int channelId)
{
- return new AMQFrame(channelId, new ExchangeDeclareOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody()));
}
@Override
- public AMQFrame exchangeDelete(final int channelId,
- final AMQShortString exchange,
- final boolean ifUnused,
- final boolean nowait)
+ public void receiveExchangeDelete(final int channelId,
+ final AMQShortString exchange,
+ final boolean ifUnused,
+ final boolean nowait)
{
- return new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait));
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)));
}
@Override
- public AMQFrame exchangeDeleteOk(final int channelId)
+ public void receiveExchangeDeleteOk(final int channelId)
{
- return new AMQFrame(channelId, new ExchangeDeleteOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody()));
}
@Override
- public AMQFrame exchangeBound(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final AMQShortString queue)
+ public void receiveExchangeBound(final int channelId,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final AMQShortString queue)
{
- return new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue));
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)));
}
@Override
- public AMQFrame exchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
+ public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
{
- return new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText));
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)));
}
@Override
- public AMQFrame queueBindOk(final int channelId)
+ public void receiveQueueBindOk(final int channelId)
{
- return new AMQFrame(channelId, new QueueBindOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody()));
}
@Override
- public AMQFrame queueUnbindOk(final int channelId)
+ public void receiveQueueUnbindOk(final int channelId)
{
- return new AMQFrame(channelId, new QueueUnbindOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody()));
}
@Override
- public AMQFrame queueDeclare(final int channelId,
- final AMQShortString queue,
- final boolean passive,
- final boolean durable,
- final boolean exclusive,
- final boolean autoDelete,
- final boolean nowait,
- final FieldTable arguments)
+ public void receiveQueueDeclare(final int channelId,
+ final AMQShortString queue,
+ final boolean passive,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete,
+ final boolean nowait,
+ final FieldTable arguments)
{
- return new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments)));
}
@Override
- public AMQFrame queueDeclareOk(final int channelId,
- final AMQShortString queue,
- final long messageCount,
- final long consumerCount)
+ public void receiveQueueDeclareOk(final int channelId,
+ final AMQShortString queue,
+ final long messageCount,
+ final long consumerCount)
{
- return new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount));
+ _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)));
}
@Override
- public AMQFrame queueBind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final boolean nowait,
- final FieldTable arguments)
+ public void receiveQueueBind(final int channelId,
+ final AMQShortString queue,
+ final AMQShortString exchange,
+ final AMQShortString bindingKey,
+ final boolean nowait,
+ final FieldTable arguments)
{
- return new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)));
}
@Override
- public AMQFrame queuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
+ public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
{
- return new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait));
+ _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)));
}
@Override
- public AMQFrame queuePurgeOk(final int channelId, final long messageCount)
+ public void receiveQueuePurgeOk(final int channelId, final long messageCount)
{
- return new AMQFrame(channelId, new QueuePurgeOkBody(messageCount));
+ _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)));
}
@Override
- public AMQFrame queueDelete(final int channelId,
- final AMQShortString queue,
- final boolean ifUnused,
- final boolean ifEmpty,
- final boolean nowait)
+ public void receiveQueueDelete(final int channelId,
+ final AMQShortString queue,
+ final boolean ifUnused,
+ final boolean ifEmpty,
+ final boolean nowait)
{
- return new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait));
+ _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)));
}
@Override
- public AMQFrame queueDeleteOk(final int channelId, final long messageCount)
+ public void receiveQueueDeleteOk(final int channelId, final long messageCount)
{
- return new AMQFrame(channelId, new QueueDeleteOkBody(messageCount));
+ _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)));
}
@Override
- public AMQFrame queueUnbind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final FieldTable arguments)
+ public void receiveQueueUnbind(final int channelId,
+ final AMQShortString queue,
+ final AMQShortString exchange,
+ final AMQShortString bindingKey,
+ final FieldTable arguments)
{
- return new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)));
}
@Override
- public AMQFrame basicRecoverSyncOk(final int channelId)
+ public void receiveBasicRecoverSyncOk(final int channelId)
{
- return new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()));
+ _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())));
}
@Override
- public AMQFrame basicRecover(final int channelId, final boolean requeue, final boolean sync)
+ public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync)
{
if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
{
- return new AMQFrame(channelId, new BasicRecoverBody(requeue));
+ _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue)));
}
else
{
- return new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue));
+ _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)));
}
}
@Override
- public AMQFrame basicQos(final int channelId,
- final long prefetchSize,
- final int prefetchCount,
- final boolean global)
+ public void receiveBasicQos(final int channelId,
+ final long prefetchSize,
+ final int prefetchCount,
+ final boolean global)
{
- return new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global));
+ _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)));
}
@Override
- public AMQFrame basicQosOk(final int channelId)
+ public void receiveBasicQosOk(final int channelId)
{
- return new AMQFrame(channelId, new BasicQosOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody()));
}
@Override
- public AMQFrame basicConsume(final int channelId,
- final AMQShortString queue,
- final AMQShortString consumerTag,
- final boolean noLocal,
- final boolean noAck,
- final boolean exclusive,
- final boolean nowait,
- final FieldTable arguments)
+ public void receiveBasicConsume(final int channelId,
+ final AMQShortString queue,
+ final AMQShortString consumerTag,
+ final boolean noLocal,
+ final boolean noAck,
+ final boolean exclusive,
+ final boolean nowait,
+ final FieldTable arguments)
{
- return new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments)));
}
@Override
- public AMQFrame basicConsumeOk(final int channelId, final AMQShortString consumerTag)
+ public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag)
{
- return new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag));
+ _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
}
@Override
- public AMQFrame basicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
+ public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
{
- return new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait));
+ _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)));
}
@Override
- public AMQFrame basicCancelOk(final int channelId, final AMQShortString consumerTag)
+ public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag)
{
- return new AMQFrame(channelId, new BasicCancelOkBody(consumerTag));
+ _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)));
}
@Override
- public AMQFrame basicPublish(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final boolean mandatory,
- final boolean immediate)
+ public void receiveBasicPublish(final int channelId,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final boolean mandatory,
+ final boolean immediate)
{
- return new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate));
+ _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)));
}
@Override
- public AMQFrame basicReturn(final int channelId, final int replyCode,
- final AMQShortString replyText,
- final AMQShortString exchange,
- final AMQShortString routingKey)
+ public void receiveBasicReturn(final int channelId, final int replyCode,
+ final AMQShortString replyText,
+ final AMQShortString exchange,
+ final AMQShortString routingKey)
{
- return new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey));
+ _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)));
}
@Override
- public AMQFrame basicDeliver(final int channelId,
- final AMQShortString consumerTag,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey)
+ public void receiveBasicDeliver(final int channelId,
+ final AMQShortString consumerTag,
+ final long deliveryTag,
+ final boolean redelivered,
+ final AMQShortString exchange,
+ final AMQShortString routingKey)
{
- return new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey));
+ _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
}
@Override
- public AMQFrame basicGet(final int channelId, final AMQShortString queue, final boolean noAck)
+ public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck)
{
- return new AMQFrame(channelId, new BasicGetBody(0, queue, noAck));
+ _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)));
}
@Override
- public AMQFrame basicGetOk(final int channelId,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final long messageCount)
+ public void receiveBasicGetOk(final int channelId,
+ final long deliveryTag,
+ final boolean redelivered,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final long messageCount)
+ {
+ _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ }
+
+ @Override
+ public void receiveBasicGetEmpty(final int channelId)
{
- return new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount));
+ _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null)));
}
@Override
- public AMQFrame basicGetEmpty(final int channelId)
+ public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple)
{
- return new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null));
+ _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)));
}
@Override
- public AMQFrame basicAck(final int channelId, final long deliveryTag, final boolean multiple)
+ public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue)
{
- return new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple));
+ _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)));
}
@Override
- public AMQFrame basicReject(final int channelId, final long deliveryTag, final boolean requeue)
+ public void receiveHeartbeat()
{
- return new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue));
+ _processedMethods.add(new AMQFrame(0, new HeartbeatBody()));
}
@Override
- public AMQFrame heartbeat()
+ public ProtocolVersion getProtocolVersion()
{
- return new AMQFrame(0, new HeartbeatBody());
+ return _protocolVersion;
}
- private ProtocolVersion getProtocolVersion()
+ public void setProtocolVersion(final ProtocolVersion protocolVersion)
+ {
+ _protocolVersion = protocolVersion;
+ }
+
+ @Override
+ public void receiveMessageContent(final int channelId, final byte[] data)
{
- return _methodRegistry.getProtocolVersion();
+ _processedMethods.add(new AMQFrame(channelId, new ContentBody(data)));
}
@Override
- public AMQFrame messageContent(final int channelId, final byte[] data)
+ public void receiveMessageHeader(final int channelId,
+ final BasicContentHeaderProperties properties,
+ final long bodySize)
{
- return new AMQFrame(channelId, new ContentBody(data));
+ _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)));
}
@Override
- public AMQFrame messageHeader(final int channelId,
- final BasicContentHeaderProperties properties,
- final long bodySize)
+ public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
{
- return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize));
+ _processedMethods.add(protocolInitiation);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index 23f71c62db..b5f854eb0e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -81,9 +81,9 @@ public class HeartbeatBody implements AMQBody
return new AMQFrame(0, this);
}
- public static <T> T process(final int channel,
+ public static void process(final int channel,
final MarkableDataInput in,
- final MethodProcessor<T> processor,
+ final MethodProcessor processor,
final long bodySize) throws IOException
{
@@ -91,6 +91,6 @@ public class HeartbeatBody implements AMQBody
{
in.skip(bodySize);
}
- return processor.heartbeat();
+ processor.receiveHeartbeat();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
index ecedacaba4..e995cbf181 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
@@ -20,178 +20,182 @@
*/
package org.apache.qpid.framing;
-public interface MethodProcessor<T>
+public interface MethodProcessor
{
- T connectionStart(short versionMajor,
- short versionMinor,
- FieldTable serverProperties,
- byte[] mechanisms,
- byte[] locales);
+ ProtocolVersion getProtocolVersion();
- T connectionStartOk(FieldTable clientProperties,
- AMQShortString mechanism,
- byte[] response,
- AMQShortString locale);
+ void receiveConnectionStart(short versionMajor,
+ short versionMinor,
+ FieldTable serverProperties,
+ byte[] mechanisms,
+ byte[] locales);
- T txSelect(int channelId);
+ void receiveConnectionStartOk(FieldTable clientProperties,
+ AMQShortString mechanism,
+ byte[] response,
+ AMQShortString locale);
- T txSelectOk(int channelId);
+ void receiveTxSelect(int channelId);
- T txCommit(int channelId);
+ void receiveTxSelectOk(int channelId);
- T txCommitOk(int channelId);
+ void receiveTxCommit(int channelId);
- T txRollback(int channelId);
+ void receiveTxCommitOk(int channelId);
- T txRollbackOk(int channelId);
+ void receiveTxRollback(int channelId);
- T connectionSecure(byte[] challenge);
+ void receiveTxRollbackOk(int channelId);
- T connectionSecureOk(byte[] response);
+ void receiveConnectionSecure(byte[] challenge);
- T connectionTune(int channelMax, long frameMax, int heartbeat);
+ void receiveConnectionSecureOk(byte[] response);
- T connectionTuneOk(int channelMax, long frameMax, int heartbeat);
+ void receiveConnectionTune(int channelMax, long frameMax, int heartbeat);
- T connectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
+ void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat);
- T connectionOpenOk(AMQShortString knownHosts);
+ void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
- T connectionRedirect(AMQShortString host, AMQShortString knownHosts);
+ void receiveConnectionOpenOk(AMQShortString knownHosts);
- T connectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
+ void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts);
- T connectionCloseOk();
+ void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
- T channelOpen(int channelId);
+ void receiveConnectionCloseOk();
- T channelOpenOk(int channelId);
+ void receiveChannelOpen(int channelId);
- T channelFlow(int channelId, boolean active);
+ void receiveChannelOpenOk(int channelId);
- T channelFlowOk(int channelId, boolean active);
+ void receiveChannelFlow(int channelId, boolean active);
- T channelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details);
+ void receiveChannelFlowOk(int channelId, boolean active);
- T channelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId);
+ void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details);
- T channelCloseOk(int channelId);
+ void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId);
- T accessRequest(int channelId,
- AMQShortString realm,
- boolean exclusive,
- boolean passive,
- boolean active,
- boolean write, boolean read);
+ void receiveChannelCloseOk(int channelId);
- T accessRequestOk(int channelId, int ticket);
+ void receiveAccessRequest(int channelId,
+ AMQShortString realm,
+ boolean exclusive,
+ boolean passive,
+ boolean active,
+ boolean write, boolean read);
- T exchangeDeclare(int channelId,
- AMQShortString exchange,
- AMQShortString type,
- boolean passive,
- boolean durable,
- boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
+ void receiveAccessRequestOk(int channelId, int ticket);
- T exchangeDeclareOk(int channelId);
+ void receiveExchangeDeclare(int channelId,
+ AMQShortString exchange,
+ AMQShortString type,
+ boolean passive,
+ boolean durable,
+ boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
- T exchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait);
+ void receiveExchangeDeclareOk(int channelId);
- T exchangeDeleteOk(int channelId);
+ void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait);
- T exchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
+ void receiveExchangeDeleteOk(int channelId);
- T exchangeBoundOk(int channelId, int replyCode, AMQShortString replyText);
+ void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
- T queueBindOk(int channelId);
+ void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText);
- T queueUnbindOk(final int channelId);
+ void receiveQueueBindOk(int channelId);
- T queueDeclare(int channelId,
- AMQShortString queue,
- boolean passive,
- boolean durable,
- boolean exclusive,
- boolean autoDelete, boolean nowait, FieldTable arguments);
+ void receiveQueueUnbindOk(final int channelId);
- T queueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount);
+ void receiveQueueDeclare(int channelId,
+ AMQShortString queue,
+ boolean passive,
+ boolean durable,
+ boolean exclusive,
+ boolean autoDelete, boolean nowait, FieldTable arguments);
- T queueBind(int channelId,
- AMQShortString queue,
- AMQShortString exchange,
- AMQShortString bindingKey,
- boolean nowait, FieldTable arguments);
+ void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount);
- T queuePurge(int channelId, AMQShortString queue, boolean nowait);
+ void receiveQueueBind(int channelId,
+ AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString bindingKey,
+ boolean nowait, FieldTable arguments);
- T queuePurgeOk(int channelId, long messageCount);
+ void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait);
- T queueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
+ void receiveQueuePurgeOk(int channelId, long messageCount);
- T queueDeleteOk(int channelId, long messageCount);
+ void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
- T queueUnbind(int channelId,
- AMQShortString queue,
- AMQShortString exchange,
- AMQShortString bindingKey,
- FieldTable arguments);
+ void receiveQueueDeleteOk(int channelId, long messageCount);
- T basicRecoverSyncOk(int channelId);
+ void receiveQueueUnbind(int channelId,
+ AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString bindingKey,
+ FieldTable arguments);
- T basicRecover(int channelId, final boolean requeue, boolean sync);
+ void receiveBasicRecoverSyncOk(int channelId);
- T basicQos(int channelId, long prefetchSize, int prefetchCount, boolean global);
+ void receiveBasicRecover(int channelId, final boolean requeue, boolean sync);
- T basicQosOk(int channelId);
+ void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global);
- T basicConsume(int channelId,
- AMQShortString queue,
- AMQShortString consumerTag,
- boolean noLocal,
- boolean noAck,
- boolean exclusive, boolean nowait, FieldTable arguments);
+ void receiveBasicQosOk(int channelId);
- T basicConsumeOk(int channelId, AMQShortString consumerTag);
+ void receiveBasicConsume(int channelId,
+ AMQShortString queue,
+ AMQShortString consumerTag,
+ boolean noLocal,
+ boolean noAck,
+ boolean exclusive, boolean nowait, FieldTable arguments);
- T basicCancel(int channelId, AMQShortString consumerTag, boolean noWait);
+ void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag);
- T basicCancelOk(int channelId, AMQShortString consumerTag);
+ void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait);
- T basicPublish(int channelId,
- AMQShortString exchange,
- AMQShortString routingKey,
- boolean mandatory,
- boolean immediate);
+ void receiveBasicCancelOk(int channelId, AMQShortString consumerTag);
- T basicReturn(final int channelId,
- int replyCode,
- AMQShortString replyText,
- AMQShortString exchange,
- AMQShortString routingKey);
+ void receiveBasicPublish(int channelId,
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ boolean mandatory,
+ boolean immediate);
- T basicDeliver(int channelId,
- AMQShortString consumerTag,
- long deliveryTag,
- boolean redelivered,
- AMQShortString exchange, AMQShortString routingKey);
+ void receiveBasicReturn(final int channelId,
+ int replyCode,
+ AMQShortString replyText,
+ AMQShortString exchange,
+ AMQShortString routingKey);
- T basicGet(int channelId, AMQShortString queue, boolean noAck);
+ void receiveBasicDeliver(int channelId,
+ AMQShortString consumerTag,
+ long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange, AMQShortString routingKey);
- T basicGetOk(int channelId,
- long deliveryTag,
- boolean redelivered,
- AMQShortString exchange,
- AMQShortString routingKey, long messageCount);
+ void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck);
- T basicGetEmpty(int channelId);
+ void receiveBasicGetOk(int channelId,
+ long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey, long messageCount);
- T basicAck(int channelId, long deliveryTag, boolean multiple);
+ void receiveBasicGetEmpty(int channelId);
- T basicReject(int channelId, long deliveryTag, boolean requeue);
+ void receiveBasicAck(int channelId, long deliveryTag, boolean multiple);
- T heartbeat();
+ void receiveBasicReject(int channelId, long deliveryTag, boolean requeue);
- T messageContent(int channelId, byte[] data);
+ void receiveHeartbeat();
- T messageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
+ void receiveMessageContent(int channelId, byte[] data);
+
+ void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
+
+ void receiveProtocolHeader(ProtocolInitiation protocolInitiation);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java
index c4fd131d0e..45c198942b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java
@@ -31,14 +31,12 @@ package org.apache.qpid.framing;
public final class MethodRegistry
{
- private final FrameCreatingMethodProcessor _methodProcessor;
private ProtocolVersion _protocolVersion;
public MethodRegistry(ProtocolVersion pv)
{
_protocolVersion = pv;
- _methodProcessor = new FrameCreatingMethodProcessor(this);
}
public void setProtocolVersion(final ProtocolVersion protocolVersion)
@@ -555,10 +553,5 @@ public final class MethodRegistry
return _protocolVersion;
}
- public FrameCreatingMethodProcessor getMethodProcessor()
- {
- return _methodProcessor;
- }
-
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
index 42e1c44d7d..e4419f77e3 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
@@ -165,9 +165,9 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -176,6 +176,6 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData
AMQShortString bindingKey = buffer.readAMQShortString();
boolean nowait = (buffer.readByte() & 0x01) == 0x01;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.queueBind(channelId, queue, exchange, bindingKey, nowait, arguments);
+ dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
index 3a8d2f41a5..1f9888c76a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
@@ -191,9 +191,9 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -206,6 +206,6 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD
boolean autoDelete = (bitfield & 0x08 ) == 0x08;
boolean nowait = (bitfield & 0x010 ) == 0x010;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.queueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
+ dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
index 47deb9cd6d..9857bb3a39 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
@@ -120,13 +120,13 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString queue = buffer.readAMQShortString();
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
long consumerCount = EncodingUtils.readUnsignedInteger(buffer);
- return dispatcher.queueDeclareOk(channelId, queue, messageCount, consumerCount);
+ dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
index fc9795f48b..408f9f9667 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
@@ -151,9 +151,9 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int ticket = buffer.readUnsignedShort();
@@ -163,6 +163,6 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa
boolean ifUnused = (bitfield & 0x01) == 0x01;
boolean ifEmpty = (bitfield & 0x02) == 0x02;
boolean nowait = (bitfield & 0x04) == 0x04;
- return dispatcher.queueDelete(channelId, queue, ifUnused, ifEmpty, nowait);
+ dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
index b04f844084..b43369b68a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
@@ -95,11 +95,11 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
- return dispatcher.queueDeleteOk(channelId, messageCount);
+ dispatcher.receiveQueueDeleteOk(channelId, messageCount);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
index d2f41922cc..5a04e21355 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
@@ -125,14 +125,14 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int ticket = buffer.readUnsignedShort();
AMQShortString queue = buffer.readAMQShortString();
boolean nowait = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.queuePurge(channelId, queue, nowait);
+ dispatcher.receiveQueuePurge(channelId, queue, nowait);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
index da5ba766ae..40cac8b390 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
@@ -95,11 +95,11 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
- return dispatcher.queuePurgeOk(channelId, messageCount);
+ dispatcher.receiveQueuePurgeOk(channelId, messageCount);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
index 968cc02212..a6f3e5b4c5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
@@ -147,9 +147,9 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -157,6 +157,6 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.queueUnbind(channelId, queue, exchange, routingKey, arguments);
+ dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments);
}
}
diff --git a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
index c61bfb302b..63696515c6 100644
--- a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
+++ b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
@@ -25,7 +25,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.List;
import junit.framework.TestCase;
@@ -33,19 +33,21 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
public class AMQDecoderTest extends TestCase
{
private AMQDecoder _decoder;
+ private FrameCreatingMethodProcessor _methodProcessor;
public void setUp()
{
- _decoder = new AMQDecoder(false, new MethodRegistry(ProtocolVersion.v0_91));
+ _methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
+ _decoder = new AMQDecoder(false, _methodProcessor);
}
@@ -59,7 +61,8 @@ public class AMQDecoderTest extends TestCase
public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
{
ByteBuffer msg = getHeartbeatBodyBuffer();
- ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+ _decoder.decodeBuffer(msg);
+ List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
if (frames.get(0) instanceof AMQFrame)
{
assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
@@ -79,9 +82,12 @@ public class AMQDecoderTest extends TestCase
msgA.limit(msgaLimit);
msg.position(msgbPos);
ByteBuffer msgB = msg.slice();
- ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msgA);
+
+ _decoder.decodeBuffer(msgA);
+ List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
assertEquals(0, frames.size());
- frames = _decoder.decodeBuffer(msgB);
+
+ _decoder.decodeBuffer(msgB);
assertEquals(1, frames.size());
if (frames.get(0) instanceof AMQFrame)
{
@@ -101,7 +107,8 @@ public class AMQDecoderTest extends TestCase
msg.put(msgA);
msg.put(msgB);
msg.flip();
- ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+ _decoder.decodeBuffer(msg);
+ List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
assertEquals(2, frames.size());
for (AMQDataBlock frame : frames)
{
@@ -138,12 +145,15 @@ public class AMQDecoderTest extends TestCase
sliceB.put(msgC);
sliceB.flip();
msgC.limit(limit);
-
- ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(sliceA);
+
+ _decoder.decodeBuffer(sliceA);
+ List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
assertEquals(1, frames.size());
- frames = _decoder.decodeBuffer(sliceB);
+ frames.clear();
+ _decoder.decodeBuffer(sliceB);
assertEquals(1, frames.size());
- frames = _decoder.decodeBuffer(msgC);
+ frames.clear();
+ _decoder.decodeBuffer(msgC);
assertEquals(1, frames.size());
for (AMQDataBlock frame : frames)
{
diff --git a/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
index 353d17ac03..b4a8155978 100644
--- a/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
@@ -27,7 +27,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -41,6 +40,7 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
+import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQFrameDecodingException;
@@ -51,7 +51,7 @@ import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -110,11 +110,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
{
@Override
- public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
{
if(!socket.isClosed())
{
- AMQFrame lastFrame = frames.get(frames.size() - 1);
+ AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
}
}
@@ -159,11 +159,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
{
@Override
- public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
{
if(!socket.isClosed())
{
- AMQFrame lastFrame = frames.get(frames.size() - 1);
+ AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
}
}
@@ -173,7 +173,7 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
private static interface ResultEvaluator
{
- void evaluate(Socket socket, List<AMQFrame> frames);
+ void evaluate(Socket socket, List<AMQDataBlock> frames);
}
private void doAMQP08test(int frameSize, ResultEvaluator evaluator)
@@ -236,17 +236,14 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
byte[] serverData = baos.toByteArray();
ByteArrayDataInput badi = new ByteArrayDataInput(serverData);
AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder();
- final MethodRegistry methodRegistry_0_91 = new MethodRegistry(ProtocolVersion.v0_91);
+ final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
- List<AMQFrame> frames = new ArrayList<>();
while (datablockDecoder.decodable(badi))
{
- frames.add(datablockDecoder.createAndPopulateFrame(methodRegistry_0_91.getProtocolVersion(),
- methodRegistry_0_91.getMethodProcessor(),
- badi));
+ datablockDecoder.processInput(methodProcessor, badi);
}
- evaluator.evaluate(socket, frames);
+ evaluator.evaluate(socket, methodProcessor.getProcessedMethods());
}
private static class TestClientDelegate extends ClientDelegate