From 89b0cfc3fd36bd8679c3f9343289c3dd98f5394f Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 10 Oct 2014 09:54:36 +0000 Subject: More refactoring git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1630745 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/BDBMessageStoreTest.java | 4 - .../server/protocol/v0_8/AMQProtocolEngine.java | 42 +- .../qpid/client/protocol/AMQProtocolHandler.java | 11 +- .../qpid/client/protocol/AMQProtocolSession.java | 12 +- .../java/org/apache/qpid/codec/AMQDecoder.java | 24 +- .../apache/qpid/framing/AMQDataBlockDecoder.java | 269 +++++++----- .../org/apache/qpid/framing/AccessRequestBody.java | 6 +- .../apache/qpid/framing/AccessRequestOkBody.java | 4 +- .../java/org/apache/qpid/framing/BasicAckBody.java | 6 +- .../org/apache/qpid/framing/BasicCancelBody.java | 6 +- .../org/apache/qpid/framing/BasicCancelOkBody.java | 4 +- .../org/apache/qpid/framing/BasicConsumeBody.java | 4 +- .../apache/qpid/framing/BasicConsumeOkBody.java | 4 +- .../org/apache/qpid/framing/BasicDeliverBody.java | 6 +- .../java/org/apache/qpid/framing/BasicGetBody.java | 4 +- .../org/apache/qpid/framing/BasicGetEmptyBody.java | 6 +- .../org/apache/qpid/framing/BasicGetOkBody.java | 6 +- .../org/apache/qpid/framing/BasicPublishBody.java | 6 +- .../java/org/apache/qpid/framing/BasicQosBody.java | 6 +- .../org/apache/qpid/framing/BasicRecoverBody.java | 6 +- .../apache/qpid/framing/BasicRecoverSyncBody.java | 6 +- .../org/apache/qpid/framing/BasicRejectBody.java | 6 +- .../org/apache/qpid/framing/BasicReturnBody.java | 6 +- .../org/apache/qpid/framing/ChannelAlertBody.java | 4 +- .../org/apache/qpid/framing/ChannelCloseBody.java | 6 +- .../org/apache/qpid/framing/ChannelFlowBody.java | 6 +- .../org/apache/qpid/framing/ChannelFlowOkBody.java | 4 +- .../org/apache/qpid/framing/ChannelOpenBody.java | 6 +- .../org/apache/qpid/framing/ChannelOpenOkBody.java | 6 +- .../apache/qpid/framing/ConnectionCloseBody.java | 4 +- .../apache/qpid/framing/ConnectionOpenBody.java | 4 +- .../apache/qpid/framing/ConnectionOpenOkBody.java | 4 +- .../qpid/framing/ConnectionRedirectBody.java | 4 +- .../apache/qpid/framing/ConnectionSecureBody.java | 4 +- .../qpid/framing/ConnectionSecureOkBody.java | 4 +- .../apache/qpid/framing/ConnectionStartBody.java | 4 +- .../apache/qpid/framing/ConnectionStartOkBody.java | 4 +- .../apache/qpid/framing/ConnectionTuneBody.java | 4 +- .../apache/qpid/framing/ConnectionTuneOkBody.java | 4 +- .../java/org/apache/qpid/framing/ContentBody.java | 6 +- .../org/apache/qpid/framing/ContentHeaderBody.java | 6 +- .../org/apache/qpid/framing/ExchangeBoundBody.java | 4 +- .../apache/qpid/framing/ExchangeBoundOkBody.java | 4 +- .../apache/qpid/framing/ExchangeDeclareBody.java | 14 +- .../apache/qpid/framing/ExchangeDeleteBody.java | 4 +- .../qpid/framing/FrameCreatingMethodProcessor.java | 464 +++++++++++---------- .../org/apache/qpid/framing/HeartbeatBody.java | 6 +- .../org/apache/qpid/framing/MethodProcessor.java | 230 +++++----- .../org/apache/qpid/framing/MethodRegistry.java | 7 - .../org/apache/qpid/framing/QueueBindBody.java | 6 +- .../org/apache/qpid/framing/QueueDeclareBody.java | 6 +- .../apache/qpid/framing/QueueDeclareOkBody.java | 6 +- .../org/apache/qpid/framing/QueueDeleteBody.java | 6 +- .../org/apache/qpid/framing/QueueDeleteOkBody.java | 6 +- .../org/apache/qpid/framing/QueuePurgeBody.java | 6 +- .../org/apache/qpid/framing/QueuePurgeOkBody.java | 6 +- .../org/apache/qpid/framing/QueueUnbindBody.java | 6 +- .../java/org/apache/qpid/codec/AMQDecoderTest.java | 32 +- .../apache/qpid/transport/MaxFrameSizeTest.java | 23 +- 59 files changed, 760 insertions(+), 614 deletions(-) diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index d9aa6e2d11..0c8a63eb5b 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -30,8 +30,6 @@ import java.util.Arrays; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; @@ -241,8 +239,6 @@ public class BDBMessageStoreTest extends MessageStoreTestCase private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) { - MethodRegistry methodRegistry = new MethodRegistry(ProtocolVersion.v0_9); - int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); return new ContentHeaderBody(props, length); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 9d92337a62..1d0c0a9b25 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -133,6 +133,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); + private final FrameCreatingMethodProcessor _methodProcessor = new FrameCreatingMethodProcessor(_protocolVersion); private final List> _taskList = new CopyOnWriteArrayList>(); @@ -185,7 +186,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); - _decoder = new AMQDecoder(true, _methodRegistry); + _decoder = new AMQDecoder(true, _methodProcessor); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -296,10 +297,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _readBytes += msg.remaining(); _receivedLock.lock(); + List processedMethods = _methodProcessor.getProcessedMethods(); try { - final ArrayList dataBlocks = _decoder.decodeBuffer(msg); - for (AMQDataBlock dataBlock : dataBlocks) + _decoder.decodeBuffer(msg); + for (AMQDataBlock dataBlock : processedMethods) { try { @@ -320,6 +322,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi break; } } + processedMethods.clear(); receivedComplete(); } catch (ConnectionScopedRuntimeException e) @@ -349,6 +352,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } finally { + processedMethods.clear(); _receivedLock.unlock(); } return null; @@ -1089,13 +1093,32 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void closeConnection(int channelId, AMQConnectionException e) { - try + + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e); - } + _logger.info("Closing connection due to: " + e); + } + closeConnection(channelId, e.getCloseFrame()); + } + + + void closeConnection(AMQConstant errorCode, + String message, int channelId, + int classId, + int methodId) + { + + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + message); + } + closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), classId, methodId))); + } + private void closeConnection(int channelId, AMQFrame frame) + { + try + { markChannelAwaitingCloseOk(channelId); closeSession(); } @@ -1103,7 +1126,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { try { - writeFrame(e.getCloseFrame()); + writeFrame(frame); } finally { @@ -1208,6 +1231,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { _protocolVersion = pv; _methodRegistry.setProtocolVersion(_protocolVersion); + _methodProcessor.setProtocolVersion(_protocolVersion); _protocolOutputConverter = new ProtocolOutputConverterImpl(this); _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 0bf83fe301..695b7c3253 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.client.protocol; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; @@ -193,7 +193,7 @@ public class AMQProtocolHandler implements ProtocolEngine _connection = con; _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); - _decoder = new AMQDecoder(false, _protocolSession.getMethodRegistry()); + _decoder = new AMQDecoder(false, _protocolSession.getMethodProcessor()); _failoverHandler = new FailoverHandler(this); } @@ -459,9 +459,10 @@ public class AMQProtocolHandler implements ProtocolEngine { _readBytes += msg.remaining(); _lastReadTime = System.currentTimeMillis(); + final List dataBlocks = _protocolSession.getMethodProcessor().getProcessedMethods(); try { - final ArrayList dataBlocks = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); // Decode buffer int size = dataBlocks.size(); @@ -511,6 +512,10 @@ public class AMQProtocolHandler implements ProtocolEngine propagateExceptionToFrameListeners(e); exception(e); } + finally + { + dataBlocks.clear(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 8a16c1c8a5..2fbb13079e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -44,6 +44,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; @@ -91,6 +92,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private final MethodRegistry _methodRegistry = new MethodRegistry(ProtocolVersion.getLatestSupportedVersion()); + private final FrameCreatingMethodProcessor _methodProcessor = + new FrameCreatingMethodProcessor(ProtocolVersion.getLatestSupportedVersion()); + private MethodDispatcher _methodDispatcher; private final AMQConnection _connection; @@ -416,7 +420,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _logger.debug("Setting ProtocolVersion to :" + pv); } _protocolVersion = pv; - _methodRegistry.setProtocolVersion(pv);; + _methodRegistry.setProtocolVersion(pv); + _methodProcessor.setProtocolVersion(pv); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); } @@ -549,4 +554,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler.setMaxFrameSize(frameMax); } + + public FrameCreatingMethodProcessor getMethodProcessor() + { + return _methodProcessor; + } } diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 5663a2c58c..b7904303b5 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -30,14 +30,13 @@ import java.util.ArrayList; import java.util.List; import java.util.ListIterator; -import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ByteArrayDataInput; import org.apache.qpid.framing.EncodingUtils; -import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.MethodProcessor; import org.apache.qpid.framing.ProtocolInitiation; /** @@ -54,7 +53,8 @@ import org.apache.qpid.framing.ProtocolInitiation; */ public class AMQDecoder { - private final MethodRegistry _registry; + private final MethodProcessor _methodProcessor; + /** Holds the 'normal' AMQP data decoder. */ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); @@ -73,12 +73,12 @@ public class AMQDecoder * Creates a new AMQP decoder. * * @param expectProtocolInitiation true if this decoder needs to handle protocol initiation. - * @param registry method registry + * @param methodProcessor method processor */ - public AMQDecoder(boolean expectProtocolInitiation, MethodRegistry registry) + public AMQDecoder(boolean expectProtocolInitiation, MethodProcessor methodProcessor) { _expectProtocolInitiation = expectProtocolInitiation; - _registry = registry; + _methodProcessor = methodProcessor; } @@ -217,14 +217,13 @@ public class AMQDecoder } - public ArrayList decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { - // get prior remaining data from accumulator - ArrayList dataBlocks = new ArrayList(); MarkableDataInput msg; + // get prior remaining data from accumulator ByteArrayInputStream bais; DataInput di; if(!_remainingBufs.isEmpty()) @@ -258,9 +257,7 @@ public class AMQDecoder enoughData = _dataBlockDecoder.decodable(msg); if (enoughData) { - dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_registry.getProtocolVersion(), - _registry.getMethodProcessor(), - msg)); + _dataBlockDecoder.processInput(_methodProcessor, msg); } } else @@ -268,7 +265,7 @@ public class AMQDecoder enoughData = _piDecoder.decodable(msg); if (enoughData) { - dataBlocks.add(new ProtocolInitiation(msg)); + _methodProcessor.receiveProtocolHeader(new ProtocolInitiation(msg)); } } @@ -305,6 +302,5 @@ public class AMQDecoder } } } - return dataBlocks; } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 0a0a570bc3..a05e3db139 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -35,7 +35,8 @@ public class AMQDataBlockDecoder private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); public AMQDataBlockDecoder() - { } + { + } public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException { @@ -52,9 +53,13 @@ public class AMQDataBlockDecoder // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() final long bodySize = in.readInt() & 0xffffffffL; - if(bodySize > _maxFrameSize) + if (bodySize > _maxFrameSize) { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Incoming frame size of "+bodySize+" is larger than negotiated maximum of " + _maxFrameSize); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Incoming frame size of " + + bodySize + + " is larger than negotiated maximum of " + + _maxFrameSize); } in.reset(); @@ -62,9 +67,8 @@ public class AMQDataBlockDecoder } - public T createAndPopulateFrame(ProtocolVersion pv, - MethodProcessor processor, - MarkableDataInput in) + public void processInput(MethodProcessor processor, + MarkableDataInput in) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { final byte type = in.readByte(); @@ -75,24 +79,24 @@ public class AMQDataBlockDecoder // bodySize can be zero if ((channel < 0) || (bodySize < 0)) { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Undecodable frame: type = " + type + " channel = " + channel - + " bodySize = " + bodySize); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); } - T result; - switch(type) + switch (type) { case 1: - result = processMethod(channel, in, processor, pv); + processMethod(channel, in, processor); break; case 2: - result = ContentHeaderBody.process(channel, in, processor, bodySize); + ContentHeaderBody.process(channel, in, processor, bodySize); break; case 3: - result = ContentBody.process(channel, in, processor, bodySize); + ContentBody.process(channel, in, processor, bodySize); break; case 8: - result = HeartbeatBody.process(channel, in, processor, bodySize); + HeartbeatBody.process(channel, in, processor, bodySize); break; default: throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); @@ -101,11 +105,11 @@ public class AMQDataBlockDecoder byte marker = in.readByte(); if ((marker & 0xFF) != 0xCE) { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "End of frame marker not found. Read " + marker + " length=" + bodySize - + " type=" + type); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type); } - return result; } public void setMaxFrameSize(final int maxFrameSize) @@ -113,200 +117,277 @@ public class AMQDataBlockDecoder _maxFrameSize = maxFrameSize; } - private T processMethod(int channelId, MarkableDataInput in, MethodProcessor dispatcher, ProtocolVersion protocolVersion) + private void processMethod(int channelId, + MarkableDataInput in, + MethodProcessor dispatcher) throws AMQFrameDecodingException, IOException { final int classAndMethod = in.readInt(); - switch (classAndMethod) { //CONNECTION_CLASS: case 0x000a000a: - return ConnectionStartBody.process(in, dispatcher); + ConnectionStartBody.process(in, dispatcher); + break; case 0x000a000b: - return ConnectionStartOkBody.process(in, dispatcher); + ConnectionStartOkBody.process(in, dispatcher); + break; case 0x000a0014: - return ConnectionSecureBody.process(in, dispatcher); + ConnectionSecureBody.process(in, dispatcher); + break; case 0x000a0015: - return ConnectionSecureOkBody.process(in, dispatcher); + ConnectionSecureOkBody.process(in, dispatcher); + break; case 0x000a001e: - return ConnectionTuneBody.process(in, dispatcher); + ConnectionTuneBody.process(in, dispatcher); + break; case 0x000a001f: - return ConnectionTuneOkBody.process(in, dispatcher); + ConnectionTuneOkBody.process(in, dispatcher); + break; case 0x000a0028: - return ConnectionOpenBody.process(in, dispatcher); + ConnectionOpenBody.process(in, dispatcher); + break; case 0x000a0029: - return ConnectionOpenOkBody.process(in, dispatcher); + ConnectionOpenOkBody.process(in, dispatcher); + break; case 0x000a002a: - return ConnectionRedirectBody.process(in, dispatcher); + ConnectionRedirectBody.process(in, dispatcher); + break; case 0x000a0032: - if (protocolVersion.equals(ProtocolVersion.v8_0)) + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - return ConnectionRedirectBody.process(in, dispatcher); + ConnectionRedirectBody.process(in, dispatcher); } else { - return ConnectionCloseBody.process(in, dispatcher); + ConnectionCloseBody.process(in, dispatcher); } + break; case 0x000a0033: - if (protocolVersion.equals(ProtocolVersion.v8_0)) + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), + dispatcher.getProtocolVersion()); } else { - return dispatcher.connectionCloseOk(); + dispatcher.receiveConnectionCloseOk(); } + break; case 0x000a003c: - if (protocolVersion.equals(ProtocolVersion.v8_0)) + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - return ConnectionCloseBody.process(in, dispatcher); + ConnectionCloseBody.process(in, dispatcher); } else { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), + dispatcher.getProtocolVersion()); } + break; case 0x000a003d: - if (protocolVersion.equals(ProtocolVersion.v8_0)) + if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - return dispatcher.connectionCloseOk(); + dispatcher.receiveConnectionCloseOk(); } else { - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), + dispatcher.getProtocolVersion()); } + break; // CHANNEL_CLASS: case 0x0014000a: - return ChannelOpenBody.process(channelId, in, dispatcher); + ChannelOpenBody.process(channelId, in, dispatcher); + break; case 0x0014000b: - return ChannelOpenOkBody.process(channelId, in, protocolVersion, dispatcher); + ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); + break; case 0x00140014: - return ChannelFlowBody.process(channelId, in, dispatcher); + ChannelFlowBody.process(channelId, in, dispatcher); + break; case 0x00140015: - return ChannelFlowOkBody.process(channelId, in, dispatcher); + ChannelFlowOkBody.process(channelId, in, dispatcher); + break; case 0x0014001e: - return ChannelAlertBody.process(channelId, in, dispatcher); + ChannelAlertBody.process(channelId, in, dispatcher); + break; case 0x00140028: - return ChannelCloseBody.process(channelId, in, dispatcher); + ChannelCloseBody.process(channelId, in, dispatcher); + break; case 0x00140029: - return dispatcher.channelCloseOk(channelId); + dispatcher.receiveChannelCloseOk(channelId); + break; // ACCESS_CLASS: case 0x001e000a: - return AccessRequestBody.process(channelId, in, dispatcher); + AccessRequestBody.process(channelId, in, dispatcher); + break; case 0x001e000b: - return AccessRequestOkBody.process(channelId, in, dispatcher); + AccessRequestOkBody.process(channelId, in, dispatcher); + break; // EXCHANGE_CLASS: case 0x0028000a: - return ExchangeDeclareBody.process(channelId, in, dispatcher); + ExchangeDeclareBody.process(channelId, in, dispatcher); + break; case 0x0028000b: - return dispatcher.exchangeDeclareOk(channelId); + dispatcher.receiveExchangeDeclareOk(channelId); + break; case 0x00280014: - return ExchangeDeleteBody.process(channelId, in, dispatcher); + ExchangeDeleteBody.process(channelId, in, dispatcher); + break; case 0x00280015: - return dispatcher.exchangeDeleteOk(channelId); + dispatcher.receiveExchangeDeleteOk(channelId); + break; case 0x00280016: - return ExchangeBoundBody.process(channelId, in, dispatcher); + ExchangeBoundBody.process(channelId, in, dispatcher); + break; case 0x00280017: - return ExchangeBoundOkBody.process(channelId, in, dispatcher); + ExchangeBoundOkBody.process(channelId, in, dispatcher); + break; // QUEUE_CLASS: case 0x0032000a: - return QueueDeclareBody.process(channelId, in, dispatcher); + QueueDeclareBody.process(channelId, in, dispatcher); + break; case 0x0032000b: - return QueueDeclareOkBody.process(channelId, in, dispatcher); + QueueDeclareOkBody.process(channelId, in, dispatcher); + break; case 0x00320014: - return QueueBindBody.process(channelId, in, dispatcher); + QueueBindBody.process(channelId, in, dispatcher); + break; case 0x00320015: - return dispatcher.queueBindOk(channelId); + dispatcher.receiveQueueBindOk(channelId); + break; case 0x0032001e: - return QueuePurgeBody.process(channelId, in, dispatcher); + QueuePurgeBody.process(channelId, in, dispatcher); + break; case 0x0032001f: - return QueuePurgeOkBody.process(channelId, in, dispatcher); + QueuePurgeOkBody.process(channelId, in, dispatcher); + break; case 0x00320028: - return QueueDeleteBody.process(channelId, in, dispatcher); + QueueDeleteBody.process(channelId, in, dispatcher); + break; case 0x00320029: - return QueueDeleteOkBody.process(channelId, in, dispatcher); + QueueDeleteOkBody.process(channelId, in, dispatcher); + break; case 0x00320032: - return QueueUnbindBody.process(channelId, in, dispatcher); + QueueUnbindBody.process(channelId, in, dispatcher); + break; case 0x00320033: - return dispatcher.queueUnbindOk(channelId); + dispatcher.receiveQueueUnbindOk(channelId); + break; // BASIC_CLASS: case 0x003c000a: - return BasicQosBody.process(channelId, in, dispatcher); + BasicQosBody.process(channelId, in, dispatcher); + break; case 0x003c000b: - return dispatcher.basicQosOk(channelId); + dispatcher.receiveBasicQosOk(channelId); + break; case 0x003c0014: - return BasicConsumeBody.process(channelId, in, dispatcher); + BasicConsumeBody.process(channelId, in, dispatcher); + break; case 0x003c0015: - return BasicConsumeOkBody.process(channelId, in, dispatcher); + BasicConsumeOkBody.process(channelId, in, dispatcher); + break; case 0x003c001e: - return BasicCancelBody.process(channelId, in, dispatcher); + BasicCancelBody.process(channelId, in, dispatcher); + break; case 0x003c001f: - return BasicCancelOkBody.process(channelId, in, dispatcher); + BasicCancelOkBody.process(channelId, in, dispatcher); + break; case 0x003c0028: - return BasicPublishBody.process(channelId, in, dispatcher); + BasicPublishBody.process(channelId, in, dispatcher); + break; case 0x003c0032: - return BasicReturnBody.process(channelId, in, dispatcher); + BasicReturnBody.process(channelId, in, dispatcher); + break; case 0x003c003c: - return BasicDeliverBody.process(channelId, in, dispatcher); + BasicDeliverBody.process(channelId, in, dispatcher); + break; case 0x003c0046: - return BasicGetBody.process(channelId, in, dispatcher); + BasicGetBody.process(channelId, in, dispatcher); + break; case 0x003c0047: - return BasicGetOkBody.process(channelId, in, dispatcher); + BasicGetOkBody.process(channelId, in, dispatcher); + break; case 0x003c0048: - return BasicGetEmptyBody.process(channelId, in, dispatcher); + BasicGetEmptyBody.process(channelId, in, dispatcher); + break; case 0x003c0050: - return BasicAckBody.process(channelId, in, dispatcher); + BasicAckBody.process(channelId, in, dispatcher); + break; case 0x003c005a: - return BasicRejectBody.process(channelId, in, dispatcher); + BasicRejectBody.process(channelId, in, dispatcher); + break; case 0x003c0064: - return BasicRecoverBody.process(channelId, in, protocolVersion, dispatcher); + BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); + break; case 0x003c0065: - return dispatcher.basicRecoverSyncOk(channelId); + dispatcher.receiveBasicRecoverSyncOk(channelId); + break; case 0x003c0066: - return BasicRecoverSyncBody.process(channelId, in, dispatcher); + BasicRecoverSyncBody.process(channelId, in, dispatcher); + break; case 0x003c006e: - return BasicRecoverSyncBody.process(channelId, in, dispatcher); + BasicRecoverSyncBody.process(channelId, in, dispatcher); + break; case 0x003c006f: - return dispatcher.basicRecoverSyncOk(channelId); + dispatcher.receiveBasicRecoverSyncOk(channelId); + break; // TX_CLASS: case 0x005a000a: - return dispatcher.txSelect(channelId); + dispatcher.receiveTxSelect(channelId); + break; case 0x005a000b: - return dispatcher.txSelectOk(channelId); + dispatcher.receiveTxSelectOk(channelId); + break; case 0x005a0014: - return dispatcher.txCommit(channelId); + dispatcher.receiveTxCommit(channelId); + break; case 0x005a0015: - return dispatcher.txCommitOk(channelId); + dispatcher.receiveTxCommitOk(channelId); + break; case 0x005a001e: - return dispatcher.txRollback(channelId); + dispatcher.receiveTxRollback(channelId); + break; case 0x005a001f: - return dispatcher.txRollbackOk(channelId); + dispatcher.receiveTxRollbackOk(channelId); + break; default: - throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion); + throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), + dispatcher.getProtocolVersion()); } } - private AMQFrameDecodingException newUnknownMethodException(final int classId, final int methodId, ProtocolVersion protocolVersion) + private AMQFrameDecodingException newUnknownMethodException(final int classId, + final int methodId, + ProtocolVersion protocolVersion) { return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID, - "Method " + methodId + " unknown in AMQP version " + protocolVersion - + " (while trying to decode class " + classId + " method " + methodId + "."); + "Method " + + methodId + + " unknown in AMQP version " + + protocolVersion + + " (while trying to decode class " + + classId + + " method " + + methodId + + "."); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java index 9a386d4eb4..ce2a5a1317 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java @@ -165,9 +165,9 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString realm = buffer.readAMQShortString(); byte bitfield = buffer.readByte(); @@ -176,6 +176,6 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ boolean active = (bitfield & 0x04) == 0x4 ; boolean write = (bitfield & 0x08) == 0x8 ; boolean read = (bitfield & 0x10) == 0x10 ; - return dispatcher.accessRequest(channelId, realm, exclusive, passive, active, write, read); + dispatcher.receiveAccessRequest(channelId, realm, exclusive, passive, active, write, read); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java index 8439df9e92..10be4d45c8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java @@ -95,10 +95,10 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); - return dispatcher.accessRequestOk(channelId, ticket); + dispatcher.receiveAccessRequestOk(channelId, ticket); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java index 956d10bf0a..70e3f10148 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java @@ -112,13 +112,13 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean multiple = (buffer.readByte() & 0x01) != 0; - return dispatcher.basicAck(channelId, deliveryTag, multiple); + dispatcher.receiveBasicAck(channelId, deliveryTag, multiple); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java index 1c619fd5d4..6f74b3870a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java @@ -113,13 +113,13 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); boolean noWait = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.basicCancel(channelId, consumerTag, noWait); + dispatcher.receiveBasicCancel(channelId, consumerTag, noWait); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java index c85223cd4f..0e9bc52d66 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java @@ -96,10 +96,10 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = in.readAMQShortString(); - return dispatcher.basicCancelOk(channelId, consumerTag); + dispatcher.receiveBasicCancelOk(channelId, consumerTag); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java index 1d6ec46c9a..94396418fe 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java @@ -191,7 +191,7 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { @@ -205,6 +205,6 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD boolean exclusive = (bitfield & 0x04) == 0x04; boolean nowait = (bitfield & 0x08) == 0x08; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.basicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); + dispatcher.receiveBasicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java index b019574a6b..d42c722fdf 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java @@ -96,10 +96,10 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); - return dispatcher.basicConsumeOk(channelId, consumerTag); + dispatcher.receiveBasicConsumeOk(channelId, consumerTag); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java index 76cd9bfff4..afa38d1852 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java @@ -152,9 +152,9 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); @@ -162,6 +162,6 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD boolean redelivered = (buffer.readByte() & 0x01) != 0; AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); - return dispatcher.basicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey); + dispatcher.receiveBasicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java index 2ebde34648..93429b97d8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java @@ -125,13 +125,13 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); AMQShortString queue = buffer.readAMQShortString(); boolean noAck = (buffer.readByte() & 0x01) != 0; - return dispatcher.basicGet(channelId, queue, noAck); + dispatcher.receiveBasicGet(channelId, queue, noAck); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java index 508c3f8e66..a42df6bcc7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java @@ -96,11 +96,11 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString clusterId = buffer.readAMQShortString(); - return dispatcher.basicGetEmpty(channelId); + dispatcher.receiveBasicGetEmpty(channelId); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java index 4020d8fb23..b8af656a35 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java @@ -151,15 +151,15 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean redelivered = (buffer.readByte() & 0x01) != 0; AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); long messageCount = EncodingUtils.readUnsignedInteger(buffer); - return dispatcher.basicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount); + dispatcher.receiveBasicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java index 7920da8405..910942c2f1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java @@ -151,9 +151,9 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); @@ -163,6 +163,6 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD boolean mandatory = (bitfield & 0x01) != 0; boolean immediate = (bitfield & 0x02) != 0; - return dispatcher.basicPublish(channelId, exchange, routingKey, mandatory, immediate); + dispatcher.receiveBasicPublish(channelId, exchange, routingKey, mandatory, immediate); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java index 0843c5ccd7..fb6b6956c6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java @@ -124,14 +124,14 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long prefetchSize = EncodingUtils.readUnsignedInteger(buffer); int prefetchCount = buffer.readUnsignedShort(); boolean global = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.basicQos(channelId, prefetchSize, prefetchCount, global); + dispatcher.receiveBasicQos(channelId, prefetchSize, prefetchCount, global); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java index 739470c658..2519f25fbe 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java @@ -100,14 +100,14 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput in, final ProtocolVersion protocolVersion, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { boolean requeue = (in.readByte() & 0x01) == 0x01; boolean sync = (ProtocolVersion.v8_0.equals(protocolVersion)); - return dispatcher.basicRecover(channelId, requeue, sync); + dispatcher.receiveBasicRecover(channelId, requeue, sync); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java index 5826bd9d16..16c9798977 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java @@ -103,11 +103,11 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput in, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { boolean requeue = (in.readByte() & 0x01) == 0x01; - return dispatcher.basicRecover(channelId, requeue, true); + dispatcher.receiveBasicRecover(channelId, requeue, true); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java index 83f2727a51..8e1ebf4013 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java @@ -112,13 +112,13 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean requeue = (buffer.readByte() & 0x01) != 0; - return dispatcher.basicReject(channelId, deliveryTag, requeue); + dispatcher.receiveBasicReject(channelId, deliveryTag, requeue); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java index 67d6c77312..cff9914705 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java @@ -134,15 +134,15 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); - return dispatcher.basicReturn(channelId, replyCode, replyText, exchange, routingKey); + dispatcher.receiveBasicReturn(channelId, replyCode, replyText, exchange, routingKey); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java index d5c535b099..11dcffc175 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java @@ -121,13 +121,13 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); FieldTable details = EncodingUtils.readFieldTable(buffer); - return dispatcher.channelAlert(channelId, replyCode, replyText, details); + dispatcher.receiveChannelAlert(channelId, replyCode, replyText, details); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java index ea1536ed2b..a4f54fbe7d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java @@ -132,15 +132,15 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); int classId = buffer.readUnsignedShort(); int methodId = buffer.readUnsignedShort(); - return dispatcher.channelClose(channelId, replyCode, replyText, classId, methodId); + dispatcher.receiveChannelClose(channelId, replyCode, replyText, classId, methodId); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java index e70eb0ea35..c975744d9f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java @@ -92,11 +92,11 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { boolean active = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.channelFlow(channelId, active); + dispatcher.receiveChannelFlow(channelId, active); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java index 13bdf332d2..a62c6155f8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java @@ -93,10 +93,10 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { boolean active = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.channelFlowOk(channelId, active); + dispatcher.receiveChannelFlowOk(channelId, active); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java index f96eb9344b..9da45d3d70 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java @@ -82,11 +82,11 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa return "[ChannelOpenBody] "; } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { buffer.readAMQShortString(); - return dispatcher.channelOpen(channelId); + dispatcher.receiveChannelOpen(channelId); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java index 5cf4d91970..775a08fbd4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java @@ -96,16 +96,16 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ return "[ChannelOpenOkBody]"; } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput in, final ProtocolVersion protocolVersion, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { if(!ProtocolVersion.v8_0.equals(protocolVersion)) { EncodingUtils.readBytes(in); } - return dispatcher.channelOpenOk(channelId); + dispatcher.receiveChannelOpenOk(channelId); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java index 02f214cee9..546cf5fa0a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java @@ -134,12 +134,12 @@ public class ConnectionCloseBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static T process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); int classId = buffer.readUnsignedShort(); int methodId = buffer.readUnsignedShort(); - return dispatcher.connectionClose(replyCode, replyText, classId, methodId); + dispatcher.receiveConnectionClose(replyCode, replyText, classId, methodId); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java index f9f55446dd..0e685deb7c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java @@ -121,12 +121,12 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static T process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString virtualHost = buffer.readAMQShortString(); AMQShortString capabilities = buffer.readAMQShortString(); boolean insist = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.connectionOpen(virtualHost, capabilities, insist); + dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java index 3f04da7a29..6d1e80c624 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java @@ -96,10 +96,10 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static T process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString knownHosts = buffer.readAMQShortString(); - return dispatcher.connectionOpenOk(knownHosts); + dispatcher.receiveConnectionOpenOk(knownHosts); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java index 80c655683f..a9b9a43b1a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java @@ -108,10 +108,10 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab return buf.toString(); } - public static T process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString host = buffer.readAMQShortString(); AMQShortString knownHosts = buffer.readAMQShortString(); - return dispatcher.connectionRedirect(host, knownHosts); + dispatcher.receiveConnectionRedirect(host, knownHosts); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java index ca208d5a89..1f7f2b0221 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java @@ -96,11 +96,11 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static T process(final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { byte[] challenge = EncodingUtils.readBytes(in); - return dispatcher.connectionSecure(challenge); + dispatcher.receiveConnectionSecure(challenge); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java index 0a2bfa613e..9a4668a9c7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java @@ -96,9 +96,9 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab return buf.toString(); } - public static T process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException { byte[] response = EncodingUtils.readBytes(in); - return dispatcher.connectionSecureOk(response); + dispatcher.receiveConnectionSecureOk(response); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java index 17a568d737..4f47f0632f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java @@ -136,7 +136,7 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static T process(final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { short versionMajor = (short) in.readUnsignedByte(); @@ -146,6 +146,6 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA byte[] locales = EncodingUtils.readBytes(in); - return dispatcher.connectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); + dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java index ba8182e569..da3d0a2c56 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java @@ -126,7 +126,7 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl return buf.toString(); } - public static T process(final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { @@ -135,6 +135,6 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl byte[] response = EncodingUtils.readBytes(in); AMQShortString locale = in.readAMQShortString(); - return dispatcher.connectionStartOk(clientProperties, mechanism, response, locale); + dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java index 2ca8e57e18..3383fd889a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java @@ -119,12 +119,12 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static T process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int channelMax = buffer.readUnsignedShort(); long frameMax = EncodingUtils.readUnsignedInteger(buffer); int heartbeat = buffer.readUnsignedShort(); - return dispatcher.connectionTune(channelMax, frameMax, heartbeat); + dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java index 7a259b6419..f695eda2c4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java @@ -119,12 +119,12 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static T process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int channelMax = buffer.readUnsignedShort(); long frameMax = EncodingUtils.readUnsignedInteger(buffer); int heartbeat = buffer.readUnsignedShort(); - return dispatcher.connectionTuneOk(channelMax, frameMax, heartbeat); + dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index dc345a6cc6..01beb3af77 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -92,14 +92,14 @@ public class ContentBody implements AMQBody return _payload; } - public static T process(final int channel, + public static void process(final int channel, final MarkableDataInput in, - final MethodProcessor methodProcessor, final long bodySize) + final MethodProcessor methodProcessor, final long bodySize) throws IOException { byte[] payload = new byte[(int)bodySize]; in.readFully(payload); - return methodProcessor.messageContent(channel, payload); + methodProcessor.receiveMessageContent(channel, payload); } private static class BufferContentBody implements AMQBody diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 081b4bdfee..0d54e09ae5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -155,9 +155,9 @@ public class ContentHeaderBody implements AMQBody _bodySize = bodySize; } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor methodProcessor, final long size) + final MethodProcessor methodProcessor, final long size) throws IOException, AMQFrameDecodingException { @@ -175,6 +175,6 @@ public class ContentHeaderBody implements AMQBody properties = new BasicContentHeaderProperties(); properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14)); - return methodProcessor.messageHeader(channelId, properties, bodySize); + methodProcessor.receiveMessageHeader(channelId, properties, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java index 8244768fb6..7548db6e93 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java @@ -122,13 +122,13 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); AMQShortString queue = buffer.readAMQShortString(); - return dispatcher.exchangeBound(channelId, exchange, routingKey, queue); + dispatcher.receiveExchangeBound(channelId, exchange, routingKey, queue); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java index 2d89a9f467..869994561f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java @@ -108,12 +108,12 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); - return dispatcher.exchangeBoundOk(channelId, replyCode, replyText); + dispatcher.receiveExchangeBoundOk(channelId, replyCode, replyText); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java index f96e6d382e..06e590f8e5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java @@ -204,9 +204,9 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -219,6 +219,14 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA boolean internal = (bitfield & 0x8) == 0x8; boolean nowait = (bitfield & 0x10) == 0x10; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.exchangeDeclare(channelId, exchange, type, passive, durable, autoDelete, internal, nowait, arguments); + dispatcher.receiveExchangeDeclare(channelId, + exchange, + type, + passive, + durable, + autoDelete, + internal, + nowait, + arguments); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java index 771fa63063..4a30e25502 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java @@ -138,7 +138,7 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { @@ -147,6 +147,6 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM byte bitfield = buffer.readByte(); boolean ifUnused = (bitfield & 0x01) == 0x01; boolean nowait = (bitfield & 0x02) == 0x02; - return dispatcher.exchangeDelete(channelId, exchange, ifUnused, nowait); + dispatcher.receiveExchangeDelete(channelId, exchange, ifUnused, nowait); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java index c8b7d2639d..348df8b24d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java @@ -20,484 +20,506 @@ */ package org.apache.qpid.framing; -public class FrameCreatingMethodProcessor implements MethodProcessor +import java.util.ArrayList; +import java.util.List; + +public class FrameCreatingMethodProcessor implements MethodProcessor { - private final MethodRegistry _methodRegistry; + private ProtocolVersion _protocolVersion; + + private final List _processedMethods = new ArrayList<>(); - FrameCreatingMethodProcessor(final MethodRegistry methodRegistry) + public FrameCreatingMethodProcessor(final ProtocolVersion protocolVersion) { - _methodRegistry = methodRegistry; + _protocolVersion = protocolVersion; } + public List getProcessedMethods() + { + return _processedMethods; + } + @Override - public AMQFrame connectionStart(final short versionMajor, - final short versionMinor, - final FieldTable serverProperties, - final byte[] mechanisms, - final byte[] locales) + public void receiveConnectionStart(final short versionMajor, + final short versionMinor, + final FieldTable serverProperties, + final byte[] mechanisms, + final byte[] locales) { - return new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales)); + _processedMethods.add(new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales))); } @Override - public AMQFrame connectionStartOk(final FieldTable clientProperties, - final AMQShortString mechanism, - final byte[] response, - final AMQShortString locale) + public void receiveConnectionStartOk(final FieldTable clientProperties, + final AMQShortString mechanism, + final byte[] response, + final AMQShortString locale) { - return new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale)); + _processedMethods.add(new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale))); } @Override - public AMQFrame txSelect(final int channelId) + public void receiveTxSelect(final int channelId) { - return new AMQFrame(channelId, TxSelectBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE)); } @Override - public AMQFrame txSelectOk(final int channelId) + public void receiveTxSelectOk(final int channelId) { - return new AMQFrame(channelId, TxSelectOkBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE)); } @Override - public AMQFrame txCommit(final int channelId) + public void receiveTxCommit(final int channelId) { - return new AMQFrame(channelId, TxCommitBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE)); } @Override - public AMQFrame txCommitOk(final int channelId) + public void receiveTxCommitOk(final int channelId) { - return new AMQFrame(channelId, TxCommitOkBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE)); } @Override - public AMQFrame txRollback(final int channelId) + public void receiveTxRollback(final int channelId) { - return new AMQFrame(channelId, TxRollbackBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE)); } @Override - public AMQFrame txRollbackOk(final int channelId) + public void receiveTxRollbackOk(final int channelId) { - return new AMQFrame(channelId, TxRollbackOkBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE)); } @Override - public AMQFrame connectionSecure(final byte[] challenge) + public void receiveConnectionSecure(final byte[] challenge) { - return new AMQFrame(0, new ConnectionSecureBody(challenge)); + _processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge))); } @Override - public AMQFrame connectionSecureOk(final byte[] response) + public void receiveConnectionSecureOk(final byte[] response) { - return new AMQFrame(0, new ConnectionSecureOkBody(response)); + _processedMethods.add(new AMQFrame(0, new ConnectionSecureOkBody(response))); } @Override - public AMQFrame connectionTune(final int channelMax, final long frameMax, final int heartbeat) + public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat) { - return new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat)); + _processedMethods.add(new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat))); } @Override - public AMQFrame connectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) + public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) { - return new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat)); + _processedMethods.add(new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat))); } @Override - public AMQFrame connectionOpen(final AMQShortString virtualHost, - final AMQShortString capabilities, - final boolean insist) + public void receiveConnectionOpen(final AMQShortString virtualHost, + final AMQShortString capabilities, + final boolean insist) { - return new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist)); + _processedMethods.add(new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist))); } @Override - public AMQFrame connectionOpenOk(final AMQShortString knownHosts) + public void receiveConnectionOpenOk(final AMQShortString knownHosts) { - return new AMQFrame(0, new ConnectionOpenOkBody(knownHosts)); + _processedMethods.add(new AMQFrame(0, new ConnectionOpenOkBody(knownHosts))); } @Override - public AMQFrame connectionRedirect(final AMQShortString host, final AMQShortString knownHosts) + public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts) { - return new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts)); + _processedMethods.add(new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts))); } @Override - public AMQFrame connectionClose(final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) + public void receiveConnectionClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) { - return new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId)); + _processedMethods.add(new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId))); } @Override - public AMQFrame connectionCloseOk() + public void receiveConnectionCloseOk() { - return new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion()) + _processedMethods.add(new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion()) ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8 - : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9); + : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9)); } @Override - public AMQFrame channelOpen(final int channelId) + public void receiveChannelOpen(final int channelId) { - return new AMQFrame(channelId, new ChannelOpenBody()); + _processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody())); } @Override - public AMQFrame channelOpenOk(final int channelId) + public void receiveChannelOpenOk(final int channelId) { - return new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) + _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) ? ChannelOpenOkBody.INSTANCE_0_8 - : ChannelOpenOkBody.INSTANCE_0_9); + : ChannelOpenOkBody.INSTANCE_0_9)); } @Override - public AMQFrame channelFlow(final int channelId, final boolean active) + public void receiveChannelFlow(final int channelId, final boolean active) { - return new AMQFrame(channelId, new ChannelFlowBody(active)); + _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active))); } @Override - public AMQFrame channelFlowOk(final int channelId, final boolean active) + public void receiveChannelFlowOk(final int channelId, final boolean active) { - return new AMQFrame(channelId, new ChannelFlowOkBody(active)); + _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active))); } @Override - public AMQFrame channelAlert(final int channelId, - final int replyCode, - final AMQShortString replyText, - final FieldTable details) + public void receiveChannelAlert(final int channelId, + final int replyCode, + final AMQShortString replyText, + final FieldTable details) { - return new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)); + _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details))); } @Override - public AMQFrame channelClose(final int channelId, - final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) + public void receiveChannelClose(final int channelId, + final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) { - return new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)); + _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId))); } @Override - public AMQFrame channelCloseOk(final int channelId) + public void receiveChannelCloseOk(final int channelId) { - return new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE)); } @Override - public AMQFrame accessRequest(final int channelId, - final AMQShortString realm, - final boolean exclusive, - final boolean passive, - final boolean active, - final boolean write, - final boolean read) + public void receiveAccessRequest(final int channelId, + final AMQShortString realm, + final boolean exclusive, + final boolean passive, + final boolean active, + final boolean write, + final boolean read) { - return new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)); + _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read))); } @Override - public AMQFrame accessRequestOk(final int channelId, final int ticket) + public void receiveAccessRequestOk(final int channelId, final int ticket) { - return new AMQFrame(channelId, new AccessRequestOkBody(ticket)); + _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket))); } @Override - public AMQFrame exchangeDeclare(final int channelId, - final AMQShortString exchange, - final AMQShortString type, - final boolean passive, - final boolean durable, - final boolean autoDelete, - final boolean internal, - final boolean nowait, final FieldTable arguments) + public void receiveExchangeDeclare(final int channelId, + final AMQShortString exchange, + final AMQShortString type, + final boolean passive, + final boolean durable, + final boolean autoDelete, + final boolean internal, + final boolean nowait, final FieldTable arguments) { - return new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments))); } @Override - public AMQFrame exchangeDeclareOk(final int channelId) + public void receiveExchangeDeclareOk(final int channelId) { - return new AMQFrame(channelId, new ExchangeDeclareOkBody()); + _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody())); } @Override - public AMQFrame exchangeDelete(final int channelId, - final AMQShortString exchange, - final boolean ifUnused, - final boolean nowait) + public void receiveExchangeDelete(final int channelId, + final AMQShortString exchange, + final boolean ifUnused, + final boolean nowait) { - return new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait))); } @Override - public AMQFrame exchangeDeleteOk(final int channelId) + public void receiveExchangeDeleteOk(final int channelId) { - return new AMQFrame(channelId, new ExchangeDeleteOkBody()); + _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody())); } @Override - public AMQFrame exchangeBound(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final AMQShortString queue) + public void receiveExchangeBound(final int channelId, + final AMQShortString exchange, + final AMQShortString routingKey, + final AMQShortString queue) { - return new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue))); } @Override - public AMQFrame exchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) + public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) { - return new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText))); } @Override - public AMQFrame queueBindOk(final int channelId) + public void receiveQueueBindOk(final int channelId) { - return new AMQFrame(channelId, new QueueBindOkBody()); + _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody())); } @Override - public AMQFrame queueUnbindOk(final int channelId) + public void receiveQueueUnbindOk(final int channelId) { - return new AMQFrame(channelId, new QueueUnbindOkBody()); + _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody())); } @Override - public AMQFrame queueDeclare(final int channelId, - final AMQShortString queue, - final boolean passive, - final boolean durable, - final boolean exclusive, - final boolean autoDelete, - final boolean nowait, - final FieldTable arguments) + public void receiveQueueDeclare(final int channelId, + final AMQShortString queue, + final boolean passive, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, + final boolean nowait, + final FieldTable arguments) { - return new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments)); + _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments))); } @Override - public AMQFrame queueDeclareOk(final int channelId, - final AMQShortString queue, - final long messageCount, - final long consumerCount) + public void receiveQueueDeclareOk(final int channelId, + final AMQShortString queue, + final long messageCount, + final long consumerCount) { - return new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)); + _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount))); } @Override - public AMQFrame queueBind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final boolean nowait, - final FieldTable arguments) + public void receiveQueueBind(final int channelId, + final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final boolean nowait, + final FieldTable arguments) { - return new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)); + _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments))); } @Override - public AMQFrame queuePurge(final int channelId, final AMQShortString queue, final boolean nowait) + public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait) { - return new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)); + _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait))); } @Override - public AMQFrame queuePurgeOk(final int channelId, final long messageCount) + public void receiveQueuePurgeOk(final int channelId, final long messageCount) { - return new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)); + _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount))); } @Override - public AMQFrame queueDelete(final int channelId, - final AMQShortString queue, - final boolean ifUnused, - final boolean ifEmpty, - final boolean nowait) + public void receiveQueueDelete(final int channelId, + final AMQShortString queue, + final boolean ifUnused, + final boolean ifEmpty, + final boolean nowait) { - return new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)); + _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait))); } @Override - public AMQFrame queueDeleteOk(final int channelId, final long messageCount) + public void receiveQueueDeleteOk(final int channelId, final long messageCount) { - return new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)); + _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount))); } @Override - public AMQFrame queueUnbind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final FieldTable arguments) + public void receiveQueueUnbind(final int channelId, + final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final FieldTable arguments) { - return new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)); + _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments))); } @Override - public AMQFrame basicRecoverSyncOk(final int channelId) + public void receiveBasicRecoverSyncOk(final int channelId) { - return new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())); + _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()))); } @Override - public AMQFrame basicRecover(final int channelId, final boolean requeue, final boolean sync) + public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync) { if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) { - return new AMQFrame(channelId, new BasicRecoverBody(requeue)); + _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue))); } else { - return new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)); + _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue))); } } @Override - public AMQFrame basicQos(final int channelId, - final long prefetchSize, - final int prefetchCount, - final boolean global) + public void receiveBasicQos(final int channelId, + final long prefetchSize, + final int prefetchCount, + final boolean global) { - return new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)); + _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global))); } @Override - public AMQFrame basicQosOk(final int channelId) + public void receiveBasicQosOk(final int channelId) { - return new AMQFrame(channelId, new BasicQosOkBody()); + _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody())); } @Override - public AMQFrame basicConsume(final int channelId, - final AMQShortString queue, - final AMQShortString consumerTag, - final boolean noLocal, - final boolean noAck, - final boolean exclusive, - final boolean nowait, - final FieldTable arguments) + public void receiveBasicConsume(final int channelId, + final AMQShortString queue, + final AMQShortString consumerTag, + final boolean noLocal, + final boolean noAck, + final boolean exclusive, + final boolean nowait, + final FieldTable arguments) { - return new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments)); + _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments))); } @Override - public AMQFrame basicConsumeOk(final int channelId, final AMQShortString consumerTag) + public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag) { - return new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)); + _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag))); } @Override - public AMQFrame basicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) + public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) { - return new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)); + _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait))); } @Override - public AMQFrame basicCancelOk(final int channelId, final AMQShortString consumerTag) + public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag) { - return new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)); + _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag))); } @Override - public AMQFrame basicPublish(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final boolean mandatory, - final boolean immediate) + public void receiveBasicPublish(final int channelId, + final AMQShortString exchange, + final AMQShortString routingKey, + final boolean mandatory, + final boolean immediate) { - return new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)); + _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate))); } @Override - public AMQFrame basicReturn(final int channelId, final int replyCode, - final AMQShortString replyText, - final AMQShortString exchange, - final AMQShortString routingKey) + public void receiveBasicReturn(final int channelId, final int replyCode, + final AMQShortString replyText, + final AMQShortString exchange, + final AMQShortString routingKey) { - return new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)); + _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey))); } @Override - public AMQFrame basicDeliver(final int channelId, - final AMQShortString consumerTag, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey) + public void receiveBasicDeliver(final int channelId, + final AMQShortString consumerTag, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey) { - return new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)); + _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); } @Override - public AMQFrame basicGet(final int channelId, final AMQShortString queue, final boolean noAck) + public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck) { - return new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)); + _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck))); } @Override - public AMQFrame basicGetOk(final int channelId, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey, - final long messageCount) + public void receiveBasicGetOk(final int channelId, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey, + final long messageCount) + { + _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); + } + + @Override + public void receiveBasicGetEmpty(final int channelId) { - return new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)); + _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null))); } @Override - public AMQFrame basicGetEmpty(final int channelId) + public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple) { - return new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null)); + _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple))); } @Override - public AMQFrame basicAck(final int channelId, final long deliveryTag, final boolean multiple) + public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue) { - return new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)); + _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue))); } @Override - public AMQFrame basicReject(final int channelId, final long deliveryTag, final boolean requeue) + public void receiveHeartbeat() { - return new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)); + _processedMethods.add(new AMQFrame(0, new HeartbeatBody())); } @Override - public AMQFrame heartbeat() + public ProtocolVersion getProtocolVersion() { - return new AMQFrame(0, new HeartbeatBody()); + return _protocolVersion; } - private ProtocolVersion getProtocolVersion() + public void setProtocolVersion(final ProtocolVersion protocolVersion) + { + _protocolVersion = protocolVersion; + } + + @Override + public void receiveMessageContent(final int channelId, final byte[] data) { - return _methodRegistry.getProtocolVersion(); + _processedMethods.add(new AMQFrame(channelId, new ContentBody(data))); } @Override - public AMQFrame messageContent(final int channelId, final byte[] data) + public void receiveMessageHeader(final int channelId, + final BasicContentHeaderProperties properties, + final long bodySize) { - return new AMQFrame(channelId, new ContentBody(data)); + _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize))); } @Override - public AMQFrame messageHeader(final int channelId, - final BasicContentHeaderProperties properties, - final long bodySize) + public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) { - return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)); + _processedMethods.add(protocolInitiation); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 23f71c62db..b5f854eb0e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -81,9 +81,9 @@ public class HeartbeatBody implements AMQBody return new AMQFrame(0, this); } - public static T process(final int channel, + public static void process(final int channel, final MarkableDataInput in, - final MethodProcessor processor, + final MethodProcessor processor, final long bodySize) throws IOException { @@ -91,6 +91,6 @@ public class HeartbeatBody implements AMQBody { in.skip(bodySize); } - return processor.heartbeat(); + processor.receiveHeartbeat(); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java index ecedacaba4..e995cbf181 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java @@ -20,178 +20,182 @@ */ package org.apache.qpid.framing; -public interface MethodProcessor +public interface MethodProcessor { - T connectionStart(short versionMajor, - short versionMinor, - FieldTable serverProperties, - byte[] mechanisms, - byte[] locales); + ProtocolVersion getProtocolVersion(); - T connectionStartOk(FieldTable clientProperties, - AMQShortString mechanism, - byte[] response, - AMQShortString locale); + void receiveConnectionStart(short versionMajor, + short versionMinor, + FieldTable serverProperties, + byte[] mechanisms, + byte[] locales); - T txSelect(int channelId); + void receiveConnectionStartOk(FieldTable clientProperties, + AMQShortString mechanism, + byte[] response, + AMQShortString locale); - T txSelectOk(int channelId); + void receiveTxSelect(int channelId); - T txCommit(int channelId); + void receiveTxSelectOk(int channelId); - T txCommitOk(int channelId); + void receiveTxCommit(int channelId); - T txRollback(int channelId); + void receiveTxCommitOk(int channelId); - T txRollbackOk(int channelId); + void receiveTxRollback(int channelId); - T connectionSecure(byte[] challenge); + void receiveTxRollbackOk(int channelId); - T connectionSecureOk(byte[] response); + void receiveConnectionSecure(byte[] challenge); - T connectionTune(int channelMax, long frameMax, int heartbeat); + void receiveConnectionSecureOk(byte[] response); - T connectionTuneOk(int channelMax, long frameMax, int heartbeat); + void receiveConnectionTune(int channelMax, long frameMax, int heartbeat); - T connectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); + void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); - T connectionOpenOk(AMQShortString knownHosts); + void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); - T connectionRedirect(AMQShortString host, AMQShortString knownHosts); + void receiveConnectionOpenOk(AMQShortString knownHosts); - T connectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); + void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts); - T connectionCloseOk(); + void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); - T channelOpen(int channelId); + void receiveConnectionCloseOk(); - T channelOpenOk(int channelId); + void receiveChannelOpen(int channelId); - T channelFlow(int channelId, boolean active); + void receiveChannelOpenOk(int channelId); - T channelFlowOk(int channelId, boolean active); + void receiveChannelFlow(int channelId, boolean active); - T channelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details); + void receiveChannelFlowOk(int channelId, boolean active); - T channelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId); + void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details); - T channelCloseOk(int channelId); + void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId); - T accessRequest(int channelId, - AMQShortString realm, - boolean exclusive, - boolean passive, - boolean active, - boolean write, boolean read); + void receiveChannelCloseOk(int channelId); - T accessRequestOk(int channelId, int ticket); + void receiveAccessRequest(int channelId, + AMQShortString realm, + boolean exclusive, + boolean passive, + boolean active, + boolean write, boolean read); - T exchangeDeclare(int channelId, - AMQShortString exchange, - AMQShortString type, - boolean passive, - boolean durable, - boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); + void receiveAccessRequestOk(int channelId, int ticket); - T exchangeDeclareOk(int channelId); + void receiveExchangeDeclare(int channelId, + AMQShortString exchange, + AMQShortString type, + boolean passive, + boolean durable, + boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); - T exchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait); + void receiveExchangeDeclareOk(int channelId); - T exchangeDeleteOk(int channelId); + void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait); - T exchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); + void receiveExchangeDeleteOk(int channelId); - T exchangeBoundOk(int channelId, int replyCode, AMQShortString replyText); + void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); - T queueBindOk(int channelId); + void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText); - T queueUnbindOk(final int channelId); + void receiveQueueBindOk(int channelId); - T queueDeclare(int channelId, - AMQShortString queue, - boolean passive, - boolean durable, - boolean exclusive, - boolean autoDelete, boolean nowait, FieldTable arguments); + void receiveQueueUnbindOk(final int channelId); - T queueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount); + void receiveQueueDeclare(int channelId, + AMQShortString queue, + boolean passive, + boolean durable, + boolean exclusive, + boolean autoDelete, boolean nowait, FieldTable arguments); - T queueBind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - boolean nowait, FieldTable arguments); + void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount); - T queuePurge(int channelId, AMQShortString queue, boolean nowait); + void receiveQueueBind(int channelId, + AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + boolean nowait, FieldTable arguments); - T queuePurgeOk(int channelId, long messageCount); + void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait); - T queueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); + void receiveQueuePurgeOk(int channelId, long messageCount); - T queueDeleteOk(int channelId, long messageCount); + void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); - T queueUnbind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - FieldTable arguments); + void receiveQueueDeleteOk(int channelId, long messageCount); - T basicRecoverSyncOk(int channelId); + void receiveQueueUnbind(int channelId, + AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + FieldTable arguments); - T basicRecover(int channelId, final boolean requeue, boolean sync); + void receiveBasicRecoverSyncOk(int channelId); - T basicQos(int channelId, long prefetchSize, int prefetchCount, boolean global); + void receiveBasicRecover(int channelId, final boolean requeue, boolean sync); - T basicQosOk(int channelId); + void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global); - T basicConsume(int channelId, - AMQShortString queue, - AMQShortString consumerTag, - boolean noLocal, - boolean noAck, - boolean exclusive, boolean nowait, FieldTable arguments); + void receiveBasicQosOk(int channelId); - T basicConsumeOk(int channelId, AMQShortString consumerTag); + void receiveBasicConsume(int channelId, + AMQShortString queue, + AMQShortString consumerTag, + boolean noLocal, + boolean noAck, + boolean exclusive, boolean nowait, FieldTable arguments); - T basicCancel(int channelId, AMQShortString consumerTag, boolean noWait); + void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag); - T basicCancelOk(int channelId, AMQShortString consumerTag); + void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait); - T basicPublish(int channelId, - AMQShortString exchange, - AMQShortString routingKey, - boolean mandatory, - boolean immediate); + void receiveBasicCancelOk(int channelId, AMQShortString consumerTag); - T basicReturn(final int channelId, - int replyCode, - AMQShortString replyText, - AMQShortString exchange, - AMQShortString routingKey); + void receiveBasicPublish(int channelId, + AMQShortString exchange, + AMQShortString routingKey, + boolean mandatory, + boolean immediate); - T basicDeliver(int channelId, - AMQShortString consumerTag, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, AMQShortString routingKey); + void receiveBasicReturn(final int channelId, + int replyCode, + AMQShortString replyText, + AMQShortString exchange, + AMQShortString routingKey); - T basicGet(int channelId, AMQShortString queue, boolean noAck); + void receiveBasicDeliver(int channelId, + AMQShortString consumerTag, + long deliveryTag, + boolean redelivered, + AMQShortString exchange, AMQShortString routingKey); - T basicGetOk(int channelId, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, - AMQShortString routingKey, long messageCount); + void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck); - T basicGetEmpty(int channelId); + void receiveBasicGetOk(int channelId, + long deliveryTag, + boolean redelivered, + AMQShortString exchange, + AMQShortString routingKey, long messageCount); - T basicAck(int channelId, long deliveryTag, boolean multiple); + void receiveBasicGetEmpty(int channelId); - T basicReject(int channelId, long deliveryTag, boolean requeue); + void receiveBasicAck(int channelId, long deliveryTag, boolean multiple); - T heartbeat(); + void receiveBasicReject(int channelId, long deliveryTag, boolean requeue); - T messageContent(int channelId, byte[] data); + void receiveHeartbeat(); - T messageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); + void receiveMessageContent(int channelId, byte[] data); + + void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); + + void receiveProtocolHeader(ProtocolInitiation protocolInitiation); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java index c4fd131d0e..45c198942b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java @@ -31,14 +31,12 @@ package org.apache.qpid.framing; public final class MethodRegistry { - private final FrameCreatingMethodProcessor _methodProcessor; private ProtocolVersion _protocolVersion; public MethodRegistry(ProtocolVersion pv) { _protocolVersion = pv; - _methodProcessor = new FrameCreatingMethodProcessor(this); } public void setProtocolVersion(final ProtocolVersion protocolVersion) @@ -555,10 +553,5 @@ public final class MethodRegistry return _protocolVersion; } - public FrameCreatingMethodProcessor getMethodProcessor() - { - return _methodProcessor; - } - } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java index 42e1c44d7d..e4419f77e3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java @@ -165,9 +165,9 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -176,6 +176,6 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData AMQShortString bindingKey = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.queueBind(channelId, queue, exchange, bindingKey, nowait, arguments); + dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java index 3a8d2f41a5..1f9888c76a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java @@ -191,9 +191,9 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -206,6 +206,6 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD boolean autoDelete = (bitfield & 0x08 ) == 0x08; boolean nowait = (bitfield & 0x010 ) == 0x010; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.queueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments); + dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java index 47deb9cd6d..9857bb3a39 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java @@ -120,13 +120,13 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString queue = buffer.readAMQShortString(); long messageCount = EncodingUtils.readUnsignedInteger(buffer); long consumerCount = EncodingUtils.readUnsignedInteger(buffer); - return dispatcher.queueDeclareOk(channelId, queue, messageCount, consumerCount); + dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java index fc9795f48b..408f9f9667 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java @@ -151,9 +151,9 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); @@ -163,6 +163,6 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa boolean ifUnused = (bitfield & 0x01) == 0x01; boolean ifEmpty = (bitfield & 0x02) == 0x02; boolean nowait = (bitfield & 0x04) == 0x04; - return dispatcher.queueDelete(channelId, queue, ifUnused, ifEmpty, nowait); + dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java index b04f844084..b43369b68a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java @@ -95,11 +95,11 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - return dispatcher.queueDeleteOk(channelId, messageCount); + dispatcher.receiveQueueDeleteOk(channelId, messageCount); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java index d2f41922cc..5a04e21355 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java @@ -125,14 +125,14 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); AMQShortString queue = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.queuePurge(channelId, queue, nowait); + dispatcher.receiveQueuePurge(channelId, queue, nowait); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java index da5ba766ae..40cac8b390 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java @@ -95,11 +95,11 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - return dispatcher.queuePurgeOk(channelId, messageCount); + dispatcher.receiveQueuePurgeOk(channelId, messageCount); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java index 968cc02212..a6f3e5b4c5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java @@ -147,9 +147,9 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -157,6 +157,6 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.queueUnbind(channelId, queue, exchange, routingKey, arguments); + dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments); } } diff --git a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java index c61bfb302b..63696515c6 100644 --- a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java +++ b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java @@ -25,7 +25,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.List; import junit.framework.TestCase; @@ -33,19 +33,21 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolVersion; public class AMQDecoderTest extends TestCase { private AMQDecoder _decoder; + private FrameCreatingMethodProcessor _methodProcessor; public void setUp() { - _decoder = new AMQDecoder(false, new MethodRegistry(ProtocolVersion.v0_91)); + _methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); + _decoder = new AMQDecoder(false, _methodProcessor); } @@ -59,7 +61,8 @@ public class AMQDecoderTest extends TestCase public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { ByteBuffer msg = getHeartbeatBodyBuffer(); - ArrayList frames = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); + List frames = _methodProcessor.getProcessedMethods(); if (frames.get(0) instanceof AMQFrame) { assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); @@ -79,9 +82,12 @@ public class AMQDecoderTest extends TestCase msgA.limit(msgaLimit); msg.position(msgbPos); ByteBuffer msgB = msg.slice(); - ArrayList frames = _decoder.decodeBuffer(msgA); + + _decoder.decodeBuffer(msgA); + List frames = _methodProcessor.getProcessedMethods(); assertEquals(0, frames.size()); - frames = _decoder.decodeBuffer(msgB); + + _decoder.decodeBuffer(msgB); assertEquals(1, frames.size()); if (frames.get(0) instanceof AMQFrame) { @@ -101,7 +107,8 @@ public class AMQDecoderTest extends TestCase msg.put(msgA); msg.put(msgB); msg.flip(); - ArrayList frames = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); + List frames = _methodProcessor.getProcessedMethods(); assertEquals(2, frames.size()); for (AMQDataBlock frame : frames) { @@ -138,12 +145,15 @@ public class AMQDecoderTest extends TestCase sliceB.put(msgC); sliceB.flip(); msgC.limit(limit); - - ArrayList frames = _decoder.decodeBuffer(sliceA); + + _decoder.decodeBuffer(sliceA); + List frames = _methodProcessor.getProcessedMethods(); assertEquals(1, frames.size()); - frames = _decoder.decodeBuffer(sliceB); + frames.clear(); + _decoder.decodeBuffer(sliceB); assertEquals(1, frames.size()); - frames = _decoder.decodeBuffer(msgC); + frames.clear(); + _decoder.decodeBuffer(msgC); assertEquals(1, frames.size()); for (AMQDataBlock frame : frames) { diff --git a/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java index 353d17ac03..b4a8155978 100644 --- a/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java +++ b/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java @@ -27,7 +27,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,6 +40,7 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQFrameDecodingException; @@ -51,7 +51,7 @@ import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.server.model.AuthenticationProvider; @@ -110,11 +110,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase { @Override - public void evaluate(final Socket socket, final List frames) + public void evaluate(final Socket socket, final List frames) { if(!socket.isClosed()) { - AMQFrame lastFrame = frames.get(frames.size() - 1); + AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1); assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody); } } @@ -159,11 +159,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase { @Override - public void evaluate(final Socket socket, final List frames) + public void evaluate(final Socket socket, final List frames) { if(!socket.isClosed()) { - AMQFrame lastFrame = frames.get(frames.size() - 1); + AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1); assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody); } } @@ -173,7 +173,7 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase private static interface ResultEvaluator { - void evaluate(Socket socket, List frames); + void evaluate(Socket socket, List frames); } private void doAMQP08test(int frameSize, ResultEvaluator evaluator) @@ -236,17 +236,14 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase byte[] serverData = baos.toByteArray(); ByteArrayDataInput badi = new ByteArrayDataInput(serverData); AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder(); - final MethodRegistry methodRegistry_0_91 = new MethodRegistry(ProtocolVersion.v0_91); + final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); - List frames = new ArrayList<>(); while (datablockDecoder.decodable(badi)) { - frames.add(datablockDecoder.createAndPopulateFrame(methodRegistry_0_91.getProtocolVersion(), - methodRegistry_0_91.getMethodProcessor(), - badi)); + datablockDecoder.processInput(methodProcessor, badi); } - evaluator.evaluate(socket, frames); + evaluator.evaluate(socket, methodProcessor.getProcessedMethods()); } private static class TestClientDelegate extends ClientDelegate -- cgit v1.2.1