summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-13 00:58:45 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-13 00:58:45 +0000
commit1e437d92f66da4ef0dffbfb85e9e66e5b4f4f980 (patch)
treefc7be07855ef97588f8af0bbe53d79107a9d5544
parentb71808f6e2d65056b3cded958012ad1d96cd7391 (diff)
downloadqpid-python-1e437d92f66da4ef0dffbfb85e9e66e5b4f4f980.tar.gz
Migrate broker to new direct method dispatch mechanism
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1631275 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java129
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java386
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java104
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java11
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java826
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java964
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java133
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java258
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java234
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java404
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java38
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java78
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java39
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java25
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java733
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java171
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java (renamed from java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java)95
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java (renamed from java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java)25
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/util/Functions.java24
-rw-r--r--java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java2
-rw-r--r--java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java14
67 files changed, 1774 insertions, 3343 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 27fa654843..4087b1f4a0 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -115,7 +115,7 @@ import org.apache.qpid.transport.TransportException;
public class AMQChannel
implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
AsyncAutoCommitTransaction.FutureRecorder,
- ChannelMethodProcessor
+ ServerChannelMethodProcessor
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -376,27 +376,18 @@ public class AMQChannel
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
- throws AMQException
{
- if (_currentMessage == null)
+ if (_logger.isDebugEnabled())
{
- throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+ _logger.debug("Content header received on channel " + _channelId);
}
- else
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content header received on channel " + _channelId);
- }
- _currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setContentHeaderBody(contentHeaderBody);
- deliverCurrentMessageIfComplete();
- }
+ deliverCurrentMessageIfComplete();
}
private void deliverCurrentMessageIfComplete()
- throws AMQException
{
// check and deliver if header says body length is zero
if (_currentMessage.allContentReceived())
@@ -497,7 +488,7 @@ public class AMQChannel
* @throws AMQConnectionException if the message is mandatory close-on-no-route
* @see AMQProtocolEngine#isCloseWhenNoRoute()
*/
- private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
+ private void handleUnroutableMessage(AMQMessage message)
{
boolean mandatory = message.isMandatory();
String description = currentMessageDescription();
@@ -512,26 +503,27 @@ public class AMQChannel
if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
{
- throw new AMQConnectionException(
- AMQConstant.NO_ROUTE,
- "No route for message " + currentMessageDescription(),
- 0, 0, // default class and method ids
- getConnection().getMethodRegistry(),
- (Throwable) null);
- }
-
- if (mandatory || message.isImmediate())
- {
- _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message));
+ _connection.closeConnection(AMQConstant.NO_ROUTE,
+ "No route for message " + currentMessageDescription(), _channelId);
}
else
{
- AMQShortString exchangeName = _currentMessage.getExchangeName();
- AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
+ if (mandatory || message.isImmediate())
+ {
+ _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
+ "No Route for message "
+ + currentMessageDescription(),
+ message));
+ }
+ else
+ {
+ AMQShortString exchangeName = _currentMessage.getExchangeName();
+ AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
- getVirtualHost().getEventLogger().message(
- ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
- routingKey == null ? null : routingKey.asString()));
+ getVirtualHost().getEventLogger().message(
+ ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
+ routingKey == null ? null : routingKey.asString()));
+ }
}
}
@@ -550,13 +542,8 @@ public class AMQChannel
: _currentMessage.getMessagePublishInfo().getRoutingKey().toString());
}
- public void publishContentBody(ContentBody contentBody) throws AMQException
+ public void publishContentBody(ContentBody contentBody)
{
- if (_currentMessage == null)
- {
- throw new AMQException("Received content body without previously receiving a Content Header");
- }
-
if (_logger.isDebugEnabled())
{
_logger.debug(debugIdentity() + " content body received on channel " + _channelId);
@@ -568,13 +555,6 @@ public class AMQChannel
deliverCurrentMessageIfComplete();
}
- catch (AMQException e)
- {
- // we want to make sure we don't keep a reference to the message in the
- // event of an error
- _currentMessage = null;
- throw e;
- }
catch (RuntimeException e)
{
// we want to make sure we don't keep a reference to the message in the
@@ -1277,14 +1257,10 @@ public class AMQChannel
private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle)
- throws AMQException
{
AMQMessage message = new AMQMessage(handle, _connection.getReference());
- final BasicContentHeaderProperties properties =
- incomingMessage.getContentHeader().getProperties();
-
return message;
}
@@ -1340,6 +1316,11 @@ public class AMQChannel
return _subject;
}
+ public boolean hasCurrentMessage()
+ {
+ return _currentMessage != null;
+ }
+
private class GetDeliveryMethod implements ClientDeliveryMethod
{
@@ -2242,7 +2223,10 @@ public class AMQChannel
}
@Override
- public void receiveChannelClose()
+ public void receiveChannelClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
{
sync();
_connection.closeChannel(this);
@@ -2258,6 +2242,43 @@ public class AMQChannel
}
@Override
+ public void receiveMessageContent(final byte[] data)
+ {
+
+ if(hasCurrentMessage())
+ {
+ publishContentBody(new ContentBody(data));
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "Attempt to send a content header without first sending a publish frame",
+ _channelId);
+ }
+ }
+
+ @Override
+ public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
+ {
+ if(hasCurrentMessage())
+ {
+ publishContentHeader(new ContentHeaderBody(properties, bodySize));
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "Attempt to send a content header without first sending a publish frame",
+ _channelId);
+ }
+ }
+
+ @Override
+ public boolean ignoreAllButCloseOk()
+ {
+ return _connection.ignoreAllButCloseOk() || _connection.channelAwaitingClosure(_channelId);
+ }
+
+ @Override
public void receiveChannelFlow(final boolean active)
{
sync();
@@ -2270,9 +2291,15 @@ public class AMQChannel
}
@Override
+ public void receiveChannelFlowOk(final boolean active)
+ {
+ // TODO - should we do anything here?
+ }
+
+ @Override
public void receiveExchangeBound(final AMQShortString exchangeName,
- final AMQShortString queueName,
- final AMQShortString routingKey)
+ final AMQShortString routingKey,
+ final AMQShortString queueName)
{
VirtualHostImpl virtualHost = _connection.getVirtualHost();
MethodRegistry methodRegistry = _connection.getMethodRegistry();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index dce0c3128e..3a4e780db6 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.protocol.v0_8;
import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -28,8 +31,6 @@ import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -49,7 +50,6 @@ import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQDecoder;
@@ -58,8 +58,6 @@ import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -93,8 +91,7 @@ import org.apache.qpid.util.BytesDataOutput;
public class AMQProtocolEngine implements ServerProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
- AMQVersionAwareProtocolSession,
- ConnectionMethodProcessor
+ ServerMethodProcessor<ServerChannelMethodProcessor>
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -114,9 +111,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private VirtualHostImpl<?,?,?> _virtualHost;
private final Map<Integer, AMQChannel> _channelMap =
- new HashMap<Integer, AMQChannel>();
+ new HashMap<>();
private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
- new CopyOnWriteArrayList<SessionModelListener>();
+ new CopyOnWriteArrayList<>();
private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
@@ -128,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
* Thread-safety: guarded by {@link #_receivedLock}.
*/
private final Set<AMQChannel> _channelsForCurrentMessage =
- new HashSet<AMQChannel>();
+ new HashSet<>();
private AMQDecoder _decoder;
@@ -142,14 +139,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
- private final FrameCreatingMethodProcessor _methodProcessor = new FrameCreatingMethodProcessor(_protocolVersion);
private final List<Action<? super AMQProtocolEngine>> _taskList =
- new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
+ new CopyOnWriteArrayList<>();
- private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
+ private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>();
private ProtocolOutputConverter _protocolOutputConverter;
private final Subject _authorizedSubject = new Subject();
- private MethodDispatcher _dispatcher;
private final long _connectionID;
private Object _reference = new Object();
@@ -183,6 +178,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private boolean _authenticated;
private boolean _compressionSupported;
private int _messageCompressionThreshold;
+ private int _currentClassId;
+ private int _currentMethodId;
public AMQProtocolEngine(Broker broker,
final NetworkConnection network,
@@ -195,7 +192,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
- _decoder = new AMQDecoder(true, _methodProcessor);
+ _decoder = new BrokerDecoder(this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -306,32 +303,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_readBytes += msg.remaining();
_receivedLock.lock();
- List<AMQDataBlock> processedMethods = _methodProcessor.getProcessedMethods();
try
{
_decoder.decodeBuffer(msg);
- for (AMQDataBlock dataBlock : processedMethods)
- {
- try
- {
- dataBlockReceived(dataBlock);
- }
- catch(AMQConnectionException e)
- {
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e);
- }
- break;
- }
- catch (AMQException e)
- {
- _logger.error("Unexpected exception when processing datablock", e);
- closeProtocolSession();
- break;
- }
- }
- processedMethods.clear();
receivedComplete();
}
catch (ConnectionScopedRuntimeException e)
@@ -361,7 +335,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
finally
{
- processedMethods.clear();
_receivedLock.unlock();
}
return null;
@@ -399,112 +372,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
}
- /**
- * Process the data block.
- * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}.
- *
- * @throws AMQConnectionException if unable to process the data block. In this case,
- * the connection is already closed by the time the exception is thrown. If any other
- * type of exception is thrown, the connection is not already closed.
- */
- private void dataBlockReceived(AMQDataBlock message) throws AMQException
- {
- if (message instanceof ProtocolInitiation)
- {
- protocolInitiationReceived((ProtocolInitiation) message);
-
- }
- else if (message instanceof AMQFrame)
- {
- AMQFrame frame = (AMQFrame) message;
- frameReceived(frame);
- }
- else
- {
- throw new AMQException("Unknown message type: " + message.getClass().getName() + ": " + message);
- }
- }
-
- /**
- * Handle the supplied frame.
- * Adds this frame's channel to {@link #_channelsForCurrentMessage}.
- *
- * @throws AMQConnectionException if unable to process the data block. In this case,
- * the connection is already closed by the time the exception is thrown. If any other
- * type of exception is thrown, the connection is not already closed.
- */
- private void frameReceived(AMQFrame frame) throws AMQException
+ void channelRequiresSync(final AMQChannel amqChannel)
{
- int channelId = frame.getChannel();
- AMQChannel amqChannel = _channelMap.get(channelId);
- if(amqChannel != null)
- {
- // The _receivedLock is already acquired in the caller
- // It is safe to add channel
- _channelsForCurrentMessage.add(amqChannel);
- }
- else
- {
- // Not an error. The frame is probably a channel Open for this channel id, which
- // does not require asynchronous work therefore its absence from
- // _channelsForCurrentMessage is ok.
- }
-
- AMQBody body = frame.getBodyFrame();
-
- long startTime = 0;
- String frameToString = null;
- if (_logger.isDebugEnabled())
- {
- startTime = System.currentTimeMillis();
- frameToString = frame.toString();
- _logger.debug("RECV: " + frame);
- }
-
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
- }
- else
- {
- // The channel has been told to close, we don't process any more frames until
- // it's closed.
- return;
- }
- }
-
- try
- {
- body.handle(channelId, this);
- }
- catch(AMQConnectionException e)
- {
- _logger.info(e.getMessage() + " whilst processing frame: " + body);
- closeConnection(channelId, e);
- throw e;
- }
- catch (AMQException e)
- {
- closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
- throw e;
- }
- catch (TransportException e)
- {
- closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
- throw e;
- }
-
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
- }
+ _channelsForCurrentMessage.add(amqChannel);
}
private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
@@ -623,148 +494,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return buf;
}
- public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
- {
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
-
- try
- {
- try
- {
- boolean wasAnyoneInterested = methodReceived(evt);
-
- if (!wasAnyoneInterested)
- {
- throw new AMQNoMethodHandlerException(evt);
- }
- }
- catch (AMQChannelException e)
- {
- if (getChannel(channelId) != null)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing channel due to: " + e.getMessage());
- }
-
- AMQConstant errorType = e.getErrorCode();
- if(errorType == null)
- {
- errorType = AMQConstant.INTERNAL_ERROR;
- }
- writeFrame(new AMQFrame(channelId,
- getMethodRegistry().createChannelCloseBody(errorType.getCode(),
- AMQShortString.validValueOf(e.getMessage()),
- e.getClassId(),
- e.getMethodId())));
- closeChannel(channelId, errorType, e.getMessage());
- }
- else
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("ChannelException occurred on non-existent channel:" + e.getMessage());
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing connection due to: " + e.getMessage());
- }
-
- AMQConnectionException ce = new AMQConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString(),
- methodBody, getMethodRegistry());
-
- _logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce);
- }
- }
- catch (AMQConnectionException e)
- {
- _logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, e);
- }
- }
- catch (Exception e)
- {
- _logger.error("Unexpected exception while processing frame. Closing connection.", e);
-
- closeProtocolSession();
- }
- }
- private <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
- {
- final MethodDispatcher dispatcher = getMethodDispatcher();
-
- final int channelId = evt.getChannelId();
- final B body = evt.getMethod();
-
- final AMQChannel channel = getChannel(channelId);
- if(channelId != 0 && channel == null)
- {
-
- if(! ((body instanceof ChannelOpenBody)
- || (body instanceof ChannelCloseOkBody)
- || (body instanceof ChannelCloseBody)))
- {
- throw new AMQConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body, body, getMethodRegistry());
- }
-
- }
- if(channel == null)
- {
- return body.execute(dispatcher, channelId);
- }
- else
- {
- try
- {
- return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>()
- {
- @Override
- public Boolean run() throws AMQException
- {
- return body.execute(dispatcher, channelId);
- }
- });
- }
- catch (PrivilegedActionException e)
- {
- if(e.getCause() instanceof AMQException)
- {
- throw (AMQException) e.getCause();
- }
- else
- {
- throw new ServerScopedRuntimeException(e.getCause());
- }
- }
-
-
- }
-
- }
-
- public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
- {
-
- AMQChannel channel = getAndAssertChannel(channelId);
-
- channel.publishContentHeader(body);
-
- }
-
- public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
- {
- AMQChannel channel = getAndAssertChannel(channelId);
-
- channel.publishContentBody(body);
- }
-
- public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
- {
- // NO - OP
- }
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
@@ -808,19 +537,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
synchronized (_channelMap)
{
- return new ArrayList<AMQChannel>(_channelMap.values());
- }
- }
-
- public AMQChannel getAndAssertChannel(int channelId) throws AMQException
- {
- AMQChannel channel = getChannel(channelId);
- if (channel == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
+ return new ArrayList<>(_channelMap.values());
}
-
- return channel;
}
public AMQChannel getChannel(int channelId)
@@ -899,8 +617,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
writeFrame(new AMQFrame(channel.getChannelId(),
getMethodRegistry().createChannelCloseBody(cause.getCode(),
AMQShortString.validValueOf(message),
- _methodProcessor.getClassId(),
- _methodProcessor.getMethodId())));
+ _currentClassId,
+ _currentMethodId)));
closeChannel(channel, cause, message, true);
}
@@ -1106,7 +824,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
_logger.info("Closing connection due to: " + message);
}
- closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _methodProcessor.getClassId(), _methodProcessor.getMethodId())));
+ closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId)));
}
private void closeConnection(int channelId, AMQFrame frame)
@@ -1224,9 +942,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
_protocolVersion = pv;
_methodRegistry.setProtocolVersion(_protocolVersion);
- _methodProcessor.setProtocolVersion(_protocolVersion);
_protocolOutputConverter = new ProtocolOutputConverterImpl(this);
- _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this);
}
public byte getProtocolMajorVersion()
@@ -1335,11 +1051,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _methodRegistry;
}
- public MethodDispatcher getMethodDispatcher()
- {
- return _dispatcher;
- }
-
public void closed()
{
try
@@ -1353,14 +1064,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
closeProtocolSession();
}
}
- catch (ConnectionScopedRuntimeException e)
+ catch (ConnectionScopedRuntimeException | TransportException e)
{
_logger.error("Could not close protocol engine", e);
}
- catch (TransportException e)
- {
- _logger.error("Could not close protocol engine", e);
- }
}
public void readerIdle()
@@ -1427,11 +1134,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
}
- public void setSender(Sender<ByteBuffer> sender)
- {
- // Do nothing
- }
-
public long getReadBytes()
{
return _readBytes;
@@ -1572,7 +1274,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
public List<AMQChannel> getSessionModels()
{
- return new ArrayList<AMQChannel>(getChannels());
+ return new ArrayList<>(getChannels());
}
public LogSubject getLogSubject()
@@ -2074,4 +1776,52 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _broker.getEventLogger();
}
}
+
+ @Override
+ public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
+ {
+ ServerChannelMethodProcessor channelMethodProcessor = getChannel(channelId);
+ if(channelMethodProcessor == null)
+ {
+ channelMethodProcessor = (ServerChannelMethodProcessor) Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(),
+ new Class[] { ServerChannelMethodProcessor.class }, new InvocationHandler()
+ {
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ closeConnection(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId, channelId);
+
+ return null;
+ }
+ });
+ }
+ return channelMethodProcessor;
+ }
+
+ @Override
+ public void receiveHeartbeat()
+ {
+ // No op
+ }
+
+ @Override
+ public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
+ {
+ protocolInitiationReceived(protocolInitiation);
+ }
+
+ @Override
+ public void setCurrentMethod(final int classId, final int methodId)
+ {
+ _currentClassId = classId;
+ _currentMethodId = methodId;
+ }
+
+ @Override
+ public boolean ignoreAllButCloseOk()
+ {
+ return _closing.get();
+ }
+
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
new file mode 100644
index 0000000000..5a4466b003
--- /dev/null
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.io.IOException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.codec.ServerDecoder;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
+public class BrokerDecoder extends ServerDecoder
+{
+ private final AMQProtocolEngine _connection;
+ /**
+ * Creates a new AMQP decoder.
+ *
+ * @param connection
+ */
+ public BrokerDecoder(final AMQProtocolEngine connection)
+ {
+ super(connection);
+ _connection = connection;
+ }
+
+ @Override
+ protected void processFrame(final int channelId, final byte type, final long bodySize, final MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ Subject subject;
+ AMQChannel channel = _connection.getChannel(channelId);
+ if(channel == null)
+ {
+ subject = _connection.getSubject();
+ }
+ else
+ {
+ _connection.channelRequiresSync(channel);
+
+ subject = channel.getSubject();
+ }
+ try
+ {
+ Subject.doAs(subject, new PrivilegedExceptionAction<Object>()
+ {
+ @Override
+ public Void run() throws IOException, AMQFrameDecodingException
+ {
+ doProcessFrame(channelId, type, bodySize, in);
+ return null;
+ }
+ });
+ }
+ catch (PrivilegedActionException e)
+ {
+ Throwable cause = e.getCause();
+ if(cause instanceof IOException)
+ {
+ throw (IOException) cause;
+ }
+ else if(cause instanceof AMQFrameDecodingException)
+ {
+ throw (AMQFrameDecodingException) cause;
+ }
+ else if(cause instanceof RuntimeException)
+ {
+ throw (RuntimeException) cause;
+ }
+ else throw new ServerScopedRuntimeException(cause);
+ }
+
+ }
+
+
+ private void doProcessFrame(final int channelId, final byte type, final long bodySize, final MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ super.processFrame(channelId, type, bodySize, in);
+
+ }
+
+}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
index 821d6101db..d966e9c9c6 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
@@ -20,16 +20,15 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.MessageDestination;
-import java.util.ArrayList;
-import java.util.List;
-
public class IncomingMessage
{
@@ -58,7 +57,7 @@ public class IncomingMessage
return _messagePublishInfo;
}
- public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException
+ public void addContentBodyFrame(final ContentBody contentChunk)
{
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
@@ -94,7 +93,7 @@ public class IncomingMessage
_messageDestination = e;
}
- public int getBodyCount() throws AMQException
+ public int getBodyCount()
{
return _contentChunks.size();
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
deleted file mode 100644
index b3ee5f9ff9..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
+++ /dev/null
@@ -1,826 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import java.security.PrivilegedAction;
-
-import javax.security.auth.Subject;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.Broker;
-
-public class ServerMethodDispatcherImpl implements MethodDispatcher
-{
- private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class);
-
- private final AMQProtocolEngine _connection;
-
-
- private static interface ChannelAction
- {
- void onChannel(ChannelMethodProcessor channel);
- }
-
-
- private static interface ConnectionAction
- {
- void onConnection(ConnectionMethodProcessor connection);
- }
-
-
- public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection)
- {
- return new ServerMethodDispatcherImpl(connection);
- }
-
-
- public ServerMethodDispatcherImpl(AMQProtocolEngine connection)
- {
- _connection = connection;
- }
-
-
- protected final AMQProtocolEngine getConnection()
- {
- return _connection;
- }
-
- private void processChannelMethod(int channelId, final ChannelAction action)
- {
- final AMQChannel channel = _connection.getChannel(channelId);
- if (channel == null)
- {
- closeConnection(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId);
- }
- else
- {
- Subject.doAs(channel.getSubject(), new PrivilegedAction<Void>()
- {
- @Override
- public Void run()
- {
- action.onChannel(channel);
- return null;
- }
- });
- }
-
- }
-
- private void processConnectionMethod(final ConnectionAction action)
- {
- Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>()
- {
- @Override
- public Void run()
- {
- action.onConnection(_connection);
- return null;
- }
- });
-
-
- }
-
- public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveAccessRequest(body.getRealm(),
- body.getExclusive(),
- body.getPassive(),
- body.getActive(),
- body.getWrite(),
- body.getRead());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchBasicAck(final BasicAckBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveBasicAck(body.getDeliveryTag(), body.getMultiple());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchBasicCancel(final BasicCancelBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveBasicCancel(body.getConsumerTag(),
- body.getNowait()
- );
- }
- }
- );
- return true;
- }
-
- public boolean dispatchBasicConsume(final BasicConsumeBody body, int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveBasicConsume(body.getQueue(), body.getConsumerTag(),
- body.getNoLocal(), body.getNoAck(),
- body.getExclusive(), body.getNowait(),
- body.getArguments());
- }
- }
- );
-
-
- return true;
- }
-
- private void closeConnection(final AMQConstant constant,
- final String message)
- {
- _connection.closeConnection(constant, message, 0);
- }
-
- public boolean dispatchBasicGet(final BasicGetBody body, int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveBasicGet(body.getQueue(), body.getNoAck());
- }
- }
- );
- return true;
- }
-
- public boolean dispatchBasicPublish(final BasicPublishBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveBasicPublish(body.getExchange(), body.getRoutingKey(),
- body.getMandatory(), body.getImmediate());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchBasicQos(final BasicQosBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveBasicQos(body.getPrefetchSize(), body.getPrefetchCount(),
- body.getGlobal());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchBasicRecover(final BasicRecoverBody body, int channelId)
- {
- final boolean sync = _connection.getProtocolVersion().equals(ProtocolVersion.v8_0);
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveBasicRecover(body.getRequeue(), sync);
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchBasicReject(final BasicRejectBody body, int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveBasicReject(body.getDeliveryTag(), body.getRequeue());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchChannelOpen(ChannelOpenBody body, final int channelId)
- {
- processConnectionMethod(new ConnectionAction()
- {
- @Override
- public void onConnection(final ConnectionMethodProcessor connection)
- {
- connection.receiveChannelOpen(channelId);
- }
- });
- return true;
- }
-
-
- public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
-
- public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchChannelClose(ChannelCloseBody body, int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveChannelClose();
- }
- }
- );
-
- return true;
- }
-
-
- public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveChannelCloseOk();
- }
- }
- );
-
- return true;
- }
-
-
- public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveChannelFlow(body.getActive());
- }
- }
- );
- return true;
- }
-
- public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
-
- public boolean dispatchConnectionOpen(final ConnectionOpenBody body, int channelId)
- {
- processConnectionMethod(new ConnectionAction()
- {
- @Override
- public void onConnection(final ConnectionMethodProcessor connection)
- {
- connection.receiveConnectionOpen(body.getVirtualHost(), body.getCapabilities(), body.getInsist());
- }
- });
-
- return true;
- }
-
-
- public boolean dispatchConnectionClose(final ConnectionCloseBody body, int channelId)
- {
-
- processConnectionMethod(new ConnectionAction()
- {
- @Override
- public void onConnection(final ConnectionMethodProcessor connection)
- {
- connection.receiveConnectionClose(body.getReplyCode(),
- body.getReplyText(),
- body.getClassId(),
- body.getMethodId());
- }
- });
-
- return true;
- }
-
-
- public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId)
- {
-
- processConnectionMethod(new ConnectionAction()
- {
- @Override
- public void onConnection(final ConnectionMethodProcessor connection)
- {
- connection.receiveConnectionCloseOk();
- }
- });
-
- return true;
- }
-
- public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
-
- public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
-
- public boolean dispatchConnectionSecureOk(final ConnectionSecureOkBody body, int channelId)
- {
-
- processConnectionMethod(new ConnectionAction()
- {
- @Override
- public void onConnection(final ConnectionMethodProcessor connection)
- {
- connection.receiveConnectionSecureOk(body.getResponse());
- }
- });
-
- return true;
- }
-
- private void disposeSaslServer(AMQProtocolEngine connection)
- {
- SaslServer ss = connection.getSaslServer();
- if (ss != null)
- {
- connection.setSaslServer(null);
- try
- {
- ss.dispose();
- }
- catch (SaslException e)
- {
- _logger.error("Error disposing of Sasl server: " + e);
- }
- }
- }
-
- public boolean dispatchConnectionStartOk(final ConnectionStartOkBody body, int channelId)
- {
-
- processConnectionMethod(new ConnectionAction()
- {
- @Override
- public void onConnection(final ConnectionMethodProcessor connection)
- {
- connection.receiveConnectionStartOk(body.getClientProperties(),
- body.getMechanism(),
- body.getResponse(),
- body.getLocale());
- }
- });
-
- return true;
- }
-
- public boolean dispatchConnectionTuneOk(final ConnectionTuneOkBody body, int channelId)
- {
-
- processConnectionMethod(new ConnectionAction()
- {
- @Override
- public void onConnection(final ConnectionMethodProcessor connection)
- {
- connection.receiveConnectionTuneOk(body.getChannelMax(),
- body.getFrameMax(),
- body.getHeartbeat());
- }
- });
- final AMQProtocolEngine connection = getConnection();
-
-
- return true;
- }
-
- public boolean dispatchExchangeBound(final ExchangeBoundBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveExchangeBound(body.getExchange(), body.getQueue(), body.getRoutingKey());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchExchangeDeclare(final ExchangeDeclareBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveExchangeDeclare(body.getExchange(), body.getType(),
- body.getPassive(),
- body.getDurable(),
- body.getAutoDelete(),
- body.getInternal(),
- body.getNowait(),
- body.getArguments());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchExchangeDelete(final ExchangeDeleteBody body, int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveExchangeDelete(body.getExchange(),
- body.getIfUnused(),
- body.getNowait());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchQueueBind(final QueueBindBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveQueueBind(body.getQueue(),
- body.getExchange(),
- body.getRoutingKey(),
- body.getNowait(),
- body.getArguments());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchQueueDeclare(final QueueDeclareBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveQueueDeclare(body.getQueue(),
- body.getPassive(),
- body.getDurable(),
- body.getExclusive(),
- body.getAutoDelete(),
- body.getNowait(),
- body.getArguments());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchQueueDelete(final QueueDeleteBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveQueueDelete(body.getQueue(),
- body.getIfUnused(),
- body.getIfEmpty(),
- body.getNowait());
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveQueuePurge(body.getQueue(),
- body.getNowait());
- }
- }
- );
-
- return true;
- }
-
-
- public boolean dispatchTxCommit(TxCommitBody body, final int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveTxCommit();
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchTxRollback(TxRollbackBody body, final int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveTxRollback();
- }
- }
- );
- return true;
- }
-
- public boolean dispatchTxSelect(TxSelectBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveTxSelect();
- }
- }
- );
- return true;
- }
-
- public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId)
- {
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveBasicRecover(body.getRequeue(), true);
- }
- }
- );
-
- return true;
- }
-
- public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId)
- {
-
- processChannelMethod(channelId,
- new ChannelAction()
- {
- @Override
- public void onChannel(final ChannelMethodProcessor channel)
- {
- channel.receiveQueueUnbind(body.getQueue(),
- body.getExchange(),
- body.getRoutingKey(),
- body.getArguments());
- }
- }
- );
-
- return true;
- }
-
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
deleted file mode 100644
index 625836bcf2..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
+++ /dev/null
@@ -1,964 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import java.security.PrivilegedAction;
-
-import javax.security.auth.Subject;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-
-public class ServerMethodProcessor implements MethodProcessor
-{
- private static final Logger LOGGER = Logger.getLogger(ServerMethodProcessor.class);
- private int _classId;
- private int _methodId;
-
-
- private static interface ChannelAction
- {
- void onChannel(ChannelMethodProcessor channel);
- }
-
- private ProtocolVersion _protocolVersion;
- private ServerMethodDispatcherImpl _dispatcher;
- private AMQProtocolEngine _connection;
-
- public ServerMethodProcessor(final ProtocolVersion protocolVersion)
- {
- _protocolVersion = protocolVersion;
- }
-
-
- private void processChannelMethod(int channelId, final ChannelAction action)
- {
- final AMQChannel channel = _connection.getChannel(channelId);
- if (channel == null)
- {
- // TODO throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
- }
- else
- {
- Subject.doAs(channel.getSubject(), new PrivilegedAction<Void>()
- {
- @Override
- public Void run()
- {
- action.onChannel(channel);
- return null;
- }
- });
- }
-
- }
-
- @Override
- public void receiveConnectionStart(final short versionMajor,
- final short versionMinor,
- final FieldTable serverProperties,
- final byte[] mechanisms,
- final byte[] locales)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0,
- new ConnectionStartBody(versionMajor,
- versionMinor,
- serverProperties,
- mechanisms,
- locales));
- }
- _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unexpected method received: ConnectionStart", 0
- );
-
- }
-
- @Override
- public void receiveConnectionStartOk(final FieldTable clientProperties,
- final AMQShortString mechanism,
- final byte[] response,
- final AMQShortString locale)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale));
- }
-
- Broker<?> broker = _connection.getBroker();
-
- SubjectCreator subjectCreator = _connection.getSubjectCreator();
- SaslServer ss = null;
- try
- {
- ss = subjectCreator.createSaslServer(String.valueOf(mechanism),
- _connection.getLocalFQDN(),
- _connection.getPeerPrincipal());
-
- if (ss == null)
- {
- _connection.closeConnection(AMQConstant.RESOURCE_ERROR,
- "Unable to create SASL Server:" + mechanism, 0
- );
- }
- else
- {
- _connection.setSaslServer(ss);
-
- final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
- //save clientProperties
- _connection.setClientProperties(clientProperties);
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
- switch (authResult.getStatus())
- {
- case ERROR:
- Exception cause = authResult.getCause();
-
- LOGGER.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
- _connection.closeConnection(AMQConstant.NOT_ALLOWED,
- AMQConstant.NOT_ALLOWED.getName().toString(), 0
- );
-
- disposeSaslServer();
- break;
-
- case SUCCESS:
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Connected as: " + authResult.getSubject());
- }
- _connection.setAuthorizedSubject(authResult.getSubject());
-
- int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
- if (frameMax <= 0)
- {
- frameMax = Integer.MAX_VALUE;
- }
-
- ConnectionTuneBody
- tuneBody =
- methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- frameMax,
- broker.getConnection_heartBeatDelay());
- _connection.writeFrame(tuneBody.generateFrame(0));
- break;
- case CONTINUE:
- ConnectionSecureBody
- secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- _connection.writeFrame(secureBody.generateFrame(0));
- }
- }
- }
- catch (SaslException e)
- {
- disposeSaslServer();
-
- _connection.closeConnection(AMQConstant.RESOURCE_ERROR, "SASL error: " + e.getMessage(), 0
- );
- }
-
- }
-
- @Override
- public void receiveTxSelect(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxSelectBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxSelectOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxSelectOkBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxCommit(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxCommitBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxCommitOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxCommitOkBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxRollback(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxRollbackBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxRollbackOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxRollbackOkBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveConnectionSecure(final byte[] challenge)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionSecureBody(challenge));
- }
-
- }
-
- @Override
- public void receiveConnectionSecureOk(final byte[] response)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionSecureOkBody(response));
- }
-
- }
-
- @Override
- public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat));
- }
-
- }
-
- @Override
- public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat));
- }
-
- }
-
- @Override
- public void receiveConnectionOpen(final AMQShortString virtualHost,
- final AMQShortString capabilities,
- final boolean insist)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist));
- }
-
- }
-
- @Override
- public void receiveConnectionOpenOk(final AMQShortString knownHosts)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionOpenOkBody(knownHosts));
- }
-
- }
-
- @Override
- public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts));
- }
-
- }
-
- @Override
- public void receiveConnectionClose(final int replyCode,
- final AMQShortString replyText,
- final int classId,
- final int methodId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0,
- new ConnectionCloseBody(getProtocolVersion(),
- replyCode,
- replyText,
- classId,
- methodId));
- }
-
- }
-
- @Override
- public void receiveConnectionCloseOk()
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion())
- ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8
- : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9);
- }
- }
-
- @Override
- public void receiveChannelOpen(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelOpenBody());
- }
-
- }
-
- @Override
- public void receiveChannelOpenOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
- ? ChannelOpenOkBody.INSTANCE_0_8
- : ChannelOpenOkBody.INSTANCE_0_9);
- }
- }
-
- @Override
- public void receiveChannelFlow(final int channelId, final boolean active)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelFlowBody(active));
- }
-
- }
-
- @Override
- public void receiveChannelFlowOk(final int channelId, final boolean active)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelFlowOkBody(active));
- }
-
- }
-
- @Override
- public void receiveChannelAlert(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final FieldTable details)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details));
- }
-
- }
-
- @Override
- public void receiveChannelClose(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final int classId,
- final int methodId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId));
- }
-
- }
-
- @Override
- public void receiveChannelCloseOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveAccessRequest(final int channelId,
- final AMQShortString realm,
- final boolean exclusive,
- final boolean passive,
- final boolean active,
- final boolean write,
- final boolean read)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame =
- new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read));
- }
-
- }
-
- @Override
- public void receiveAccessRequestOk(final int channelId, final int ticket)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new AccessRequestOkBody(ticket));
- }
-
- }
-
- @Override
- public void receiveExchangeDeclare(final int channelId,
- final AMQShortString exchange,
- final AMQShortString type,
- final boolean passive,
- final boolean durable,
- final boolean autoDelete,
- final boolean internal,
- final boolean nowait, final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new ExchangeDeclareBody(0,
- exchange,
- type,
- passive,
- durable,
- autoDelete,
- internal,
- nowait,
- arguments));
- }
-
- }
-
- @Override
- public void receiveExchangeDeclareOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeDeclareOkBody());
- }
-
- }
-
- @Override
- public void receiveExchangeDelete(final int channelId,
- final AMQShortString exchange,
- final boolean ifUnused,
- final boolean nowait)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait));
- }
-
- }
-
- @Override
- public void receiveExchangeDeleteOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeDeleteOkBody());
- }
-
- }
-
- @Override
- public void receiveExchangeBound(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final AMQShortString queue)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue));
- }
-
- }
-
- @Override
- public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText));
- }
-
- }
-
- @Override
- public void receiveQueueBindOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueBindOkBody());
- }
-
- }
-
- @Override
- public void receiveQueueUnbindOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueUnbindOkBody());
- }
-
- }
-
- @Override
- public void receiveQueueDeclare(final int channelId,
- final AMQShortString queue,
- final boolean passive,
- final boolean durable,
- final boolean exclusive,
- final boolean autoDelete,
- final boolean nowait,
- final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new QueueDeclareBody(0,
- queue,
- passive,
- durable,
- exclusive,
- autoDelete,
- nowait,
- arguments));
- }
-
- }
-
- @Override
- public void receiveQueueDeclareOk(final int channelId,
- final AMQShortString queue,
- final long messageCount,
- final long consumerCount)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount));
- }
-
- }
-
- @Override
- public void receiveQueueBind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final boolean nowait,
- final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame =
- new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments));
- }
-
- }
-
- @Override
- public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait));
- }
-
- }
-
- @Override
- public void receiveQueuePurgeOk(final int channelId, final long messageCount)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueuePurgeOkBody(messageCount));
- }
-
- }
-
- @Override
- public void receiveQueueDelete(final int channelId,
- final AMQShortString queue,
- final boolean ifUnused,
- final boolean ifEmpty,
- final boolean nowait)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait));
- }
-
- }
-
- @Override
- public void receiveQueueDeleteOk(final int channelId, final long messageCount)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueDeleteOkBody(messageCount));
- }
-
- }
-
- @Override
- public void receiveQueueUnbind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments));
- }
-
- }
-
- @Override
- public void receiveBasicRecoverSyncOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()));
- }
-
- }
-
- @Override
- public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync)
- {
- if (ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicRecoverBody(requeue));
- }
-
- }
- else
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue));
- }
-
- }
- }
-
- @Override
- public void receiveBasicQos(final int channelId,
- final long prefetchSize,
- final int prefetchCount,
- final boolean global)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global));
- }
-
- }
-
- @Override
- public void receiveBasicQosOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicQosOkBody());
- }
-
- }
-
- @Override
- public void receiveBasicConsume(final int channelId,
- final AMQShortString queue,
- final AMQShortString consumerTag,
- final boolean noLocal,
- final boolean noAck,
- final boolean exclusive,
- final boolean nowait,
- final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new BasicConsumeBody(0,
- queue,
- consumerTag,
- noLocal,
- noAck,
- exclusive,
- nowait,
- arguments));
- }
-
- }
-
- @Override
- public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag));
- }
-
- }
-
- @Override
- public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait));
- }
-
- }
-
- @Override
- public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicCancelOkBody(consumerTag));
- }
-
- }
-
- @Override
- public void receiveBasicPublish(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final boolean mandatory,
- final boolean immediate)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame =
- new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate));
- }
-
- }
-
- @Override
- public void receiveBasicReturn(final int channelId, final int replyCode,
- final AMQShortString replyText,
- final AMQShortString exchange,
- final AMQShortString routingKey)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey));
- }
-
- }
-
- @Override
- public void receiveBasicDeliver(final int channelId,
- final AMQShortString consumerTag,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new BasicDeliverBody(consumerTag,
- deliveryTag,
- redelivered,
- exchange,
- routingKey));
- }
-
- }
-
- @Override
- public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicGetBody(0, queue, noAck));
- }
-
- }
-
- @Override
- public void receiveBasicGetOk(final int channelId,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final long messageCount)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new BasicGetOkBody(deliveryTag,
- redelivered,
- exchange,
- routingKey,
- messageCount));
- }
-
- }
-
- @Override
- public void receiveBasicGetEmpty(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString) null));
- }
-
- }
-
- @Override
- public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple));
- }
-
- }
-
- @Override
- public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue));
- }
-
- }
-
- @Override
- public void receiveHeartbeat()
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new HeartbeatBody());
- }
-
- }
-
- @Override
- public ProtocolVersion getProtocolVersion()
- {
- return _protocolVersion;
- }
-
- public void setProtocolVersion(final ProtocolVersion protocolVersion)
- {
- _protocolVersion = protocolVersion;
- }
-
- @Override
- public void receiveMessageContent(final int channelId, final byte[] data)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ContentBody(data));
- }
-
- }
-
- @Override
- public void receiveMessageHeader(final int channelId,
- final BasicContentHeaderProperties properties,
- final long bodySize)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize));
- }
-
- }
-
- @Override
- public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQDataBlock frame = protocolInitiation;
- }
-
- }
-
- @Override
- public void setCurrentMethod(final int classId, final int methodId)
- {
- _classId = classId;
- _methodId = methodId;
- }
-
- private void disposeSaslServer()
- {
- SaslServer ss = _connection.getSaslServer();
- if (ss != null)
- {
- _connection.setSaslServer(null);
- try
- {
- ss.dispose();
- }
- catch (SaslException e)
- {
- LOGGER.error("Error disposing of Sasl server: " + e);
- }
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 695b7c3253..bb98c0abbd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -48,6 +48,7 @@ import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.ClientDecoder;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
@@ -193,7 +194,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_connection = con;
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
- _decoder = new AMQDecoder(false, _protocolSession.getMethodProcessor());
+ _decoder = new ClientDecoder(_protocolSession.getMethodProcessor());
_failoverHandler = new FailoverHandler(this);
}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index b7904303b5..9d98168687 100644
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -30,14 +30,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
-import org.apache.qpid.framing.AMQDataBlockDecoder;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ByteArrayDataInput;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.MethodProcessor;
-import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
/**
* AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
@@ -51,12 +45,9 @@ import org.apache.qpid.framing.ProtocolInitiation;
* TODO If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
* per-session overhead.
*/
-public class AMQDecoder
+public abstract class AMQDecoder<T extends MethodProcessor>
{
- private final MethodProcessor _methodProcessor;
-
- /** Holds the 'normal' AMQP data decoder. */
- private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+ private final T _methodProcessor;
/** Holds the protocol initiation decoder. */
private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
@@ -67,6 +58,8 @@ public class AMQDecoder
private boolean _firstRead = true;
+ private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
+
private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
/**
@@ -75,7 +68,7 @@ public class AMQDecoder
* @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
* @param methodProcessor method processor
*/
- public AMQDecoder(boolean expectProtocolInitiation, MethodProcessor methodProcessor)
+ protected AMQDecoder(boolean expectProtocolInitiation, T methodProcessor)
{
_expectProtocolInitiation = expectProtocolInitiation;
_methodProcessor = methodProcessor;
@@ -96,7 +89,12 @@ public class AMQDecoder
public void setMaxFrameSize(final int frameMax)
{
- _dataBlockDecoder.setMaxFrameSize(frameMax);
+ _maxFrameSize = frameMax;
+ }
+
+ public T getMethodProcessor()
+ {
+ return _methodProcessor;
}
private class RemainingByteArrayInputStream extends InputStream
@@ -254,10 +252,10 @@ public class AMQDecoder
{
if(!_expectProtocolInitiation)
{
- enoughData = _dataBlockDecoder.decodable(msg);
+ enoughData = decodable(msg);
if (enoughData)
{
- _dataBlockDecoder.processInput(_methodProcessor, msg);
+ processInput(msg);
}
}
else
@@ -303,4 +301,105 @@ public class AMQDecoder
}
}
}
+
+ private boolean decodable(final MarkableDataInput in) throws AMQFrameDecodingException, IOException
+ {
+ final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
+ // type, channel, body length and end byte
+ if (remainingAfterAttributes < 0)
+ {
+ return false;
+ }
+
+ in.mark(8);
+ in.skip(1 + 2);
+
+
+ // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
+ final long bodySize = in.readInt() & 0xffffffffL;
+ if (bodySize > _maxFrameSize)
+ {
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "Incoming frame size of "
+ + bodySize
+ + " is larger than negotiated maximum of "
+ + _maxFrameSize);
+ }
+ in.reset();
+
+ return (remainingAfterAttributes >= bodySize);
+
+ }
+
+ private void processInput(final MarkableDataInput in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+ {
+ final byte type = in.readByte();
+
+ final int channel = in.readUnsignedShort();
+ final long bodySize = EncodingUtils.readUnsignedInteger(in);
+
+ // bodySize can be zero
+ if ((channel < 0) || (bodySize < 0))
+ {
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize);
+ }
+
+ processFrame(channel, type, bodySize, in);
+
+ byte marker = in.readByte();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type);
+ }
+
+ }
+
+ protected void processFrame(final int channel, final byte type, final long bodySize, final MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ switch (type)
+ {
+ case 1:
+ processMethod(channel, in);
+ break;
+ case 2:
+ ContentHeaderBody.process(in, _methodProcessor.getChannelMethodProcessor(channel), bodySize);
+ break;
+ case 3:
+ ContentBody.process(in, _methodProcessor.getChannelMethodProcessor(channel), bodySize);
+ break;
+ case 8:
+ HeartbeatBody.process(channel, in, _methodProcessor, bodySize);
+ break;
+ default:
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type);
+ }
+ }
+
+
+ abstract void processMethod(int channelId,
+ MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException;
+
+ AMQFrameDecodingException newUnknownMethodException(final int classId,
+ final int methodId,
+ ProtocolVersion protocolVersion)
+ {
+ return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID,
+ "Method "
+ + methodId
+ + " unknown in AMQP version "
+ + protocolVersion
+ + " (while trying to decode class "
+ + classId
+ + " method "
+ + methodId
+ + ".");
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
new file mode 100644
index 0000000000..5048193cac
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import java.io.IOException;
+
+import org.apache.qpid.framing.*;
+
+public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
+{
+
+ /**
+ * Creates a new AMQP decoder.
+ *
+ * @param methodProcessor method processor
+ */
+ public ClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor)
+ {
+ super(false, methodProcessor);
+ }
+
+
+ void processMethod(int channelId,
+ MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor = getMethodProcessor();
+ ClientChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId);
+ final int classAndMethod = in.readInt();
+ int classId = classAndMethod >> 16;
+ int methodId = classAndMethod & 0xFFFF;
+ methodProcessor.setCurrentMethod(classId, methodId);
+ try
+ {
+ switch (classAndMethod)
+ {
+ //CONNECTION_CLASS:
+ case 0x000a000a:
+ ConnectionStartBody.process(in, methodProcessor);
+ break;
+ case 0x000a0014:
+ ConnectionSecureBody.process(in, methodProcessor);
+ break;
+ case 0x000a001e:
+ ConnectionTuneBody.process(in, methodProcessor);
+ break;
+ case 0x000a0029:
+ ConnectionOpenOkBody.process(in, methodProcessor);
+ break;
+ case 0x000a002a:
+ ConnectionRedirectBody.process(in, methodProcessor);
+ break;
+ case 0x000a0032:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ ConnectionRedirectBody.process(in, methodProcessor);
+ }
+ else
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ break;
+ case 0x000a0033:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ else
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ break;
+ case 0x000a003c:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ break;
+ case 0x000a003d:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ break;
+
+ // CHANNEL_CLASS:
+
+ case 0x0014000b:
+ ChannelOpenOkBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor);
+ break;
+ case 0x00140014:
+ ChannelFlowBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140015:
+ ChannelFlowOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x0014001e:
+ ChannelAlertBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140028:
+ ChannelCloseBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140029:
+ channelMethodProcessor.receiveChannelCloseOk();
+ break;
+
+ // ACCESS_CLASS:
+
+ case 0x001e000b:
+ AccessRequestOkBody.process(in, channelMethodProcessor);
+ break;
+
+ // EXCHANGE_CLASS:
+
+ case 0x0028000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveExchangeDeclareOk();
+ }
+ break;
+ case 0x00280015:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveExchangeDeleteOk();
+ }
+ break;
+ case 0x00280017:
+ ExchangeBoundOkBody.process(in, channelMethodProcessor);
+ break;
+
+
+ // QUEUE_CLASS:
+
+ case 0x0032000b:
+ QueueDeclareOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320015:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveQueueBindOk();
+ }
+ break;
+ case 0x0032001f:
+ QueuePurgeOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320029:
+ QueueDeleteOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320033:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveQueueUnbindOk();
+ }
+ break;
+
+
+ // BASIC_CLASS:
+
+ case 0x003c000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveBasicQosOk();
+ }
+ break;
+ case 0x003c0015:
+ BasicConsumeOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c001f:
+ BasicCancelOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0032:
+ BasicReturnBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c003c:
+ BasicDeliverBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0047:
+ BasicGetOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0048:
+ BasicGetEmptyBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0065:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveBasicRecoverSyncOk();
+ }
+ break;
+ case 0x003c006f:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveBasicRecoverSyncOk();
+ }
+ break;
+
+ // TX_CLASS:
+
+ case 0x005a000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxSelectOk();
+ }
+ break;
+ case 0x005a0015:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxCommitOk();
+ }
+ break;
+ case 0x005a001f:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxRollbackOk();
+ }
+ break;
+
+ default:
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+
+ }
+ }
+ finally
+ {
+ methodProcessor.setCurrentMethod(0, 0);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
new file mode 100644
index 0000000000..3b138ba278
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import java.io.IOException;
+
+import org.apache.qpid.framing.*;
+
+public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends ServerChannelMethodProcessor>>
+{
+
+ /**
+ * Creates a new AMQP decoder.
+ *
+ * @param methodProcessor method processor
+ */
+ public ServerDecoder(final ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor)
+ {
+ super(true, methodProcessor);
+ }
+
+ void processMethod(int channelId,
+ MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor = getMethodProcessor();
+ ServerChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId);
+ final int classAndMethod = in.readInt();
+ int classId = classAndMethod >> 16;
+ int methodId = classAndMethod & 0xFFFF;
+ methodProcessor.setCurrentMethod(classId, methodId);
+ try
+ {
+ switch (classAndMethod)
+ {
+ //CONNECTION_CLASS:
+ case 0x000a000b:
+ ConnectionStartOkBody.process(in, methodProcessor);
+ break;
+ case 0x000a0015:
+ ConnectionSecureOkBody.process(in, methodProcessor);
+ break;
+ case 0x000a001f:
+ ConnectionTuneOkBody.process(in, methodProcessor);
+ break;
+ case 0x000a0028:
+ ConnectionOpenBody.process(in, methodProcessor);
+ break;
+ case 0x000a0032:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ else
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ break;
+ case 0x000a0033:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ else
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ break;
+ case 0x000a003c:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ break;
+ case 0x000a003d:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ break;
+
+ // CHANNEL_CLASS:
+
+ case 0x0014000a:
+ ChannelOpenBody.process(channelId, in, methodProcessor);
+ break;
+ case 0x00140014:
+ ChannelFlowBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140015:
+ ChannelFlowOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140028:
+ ChannelCloseBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140029:
+ channelMethodProcessor.receiveChannelCloseOk();
+ break;
+
+ // ACCESS_CLASS:
+
+ case 0x001e000a:
+ AccessRequestBody.process(in, channelMethodProcessor);
+ break;
+
+ // EXCHANGE_CLASS:
+
+ case 0x0028000a:
+ ExchangeDeclareBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00280014:
+ ExchangeDeleteBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00280016:
+ ExchangeBoundBody.process(in, channelMethodProcessor);
+ break;
+
+
+ // QUEUE_CLASS:
+
+ case 0x0032000a:
+ QueueDeclareBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320014:
+ QueueBindBody.process(in, channelMethodProcessor);
+ break;
+ case 0x0032001e:
+ QueuePurgeBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320028:
+ QueueDeleteBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320032:
+ QueueUnbindBody.process(in, channelMethodProcessor);
+ break;
+
+
+ // BASIC_CLASS:
+
+ case 0x003c000a:
+ BasicQosBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0014:
+ BasicConsumeBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c001e:
+ BasicCancelBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0028:
+ BasicPublishBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0046:
+ BasicGetBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0050:
+ BasicAckBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c005a:
+ BasicRejectBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0064:
+ BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor);
+ break;
+ case 0x003c0066:
+ BasicRecoverSyncBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c006e:
+ BasicRecoverSyncBody.process(in, channelMethodProcessor);
+ break;
+
+ // TX_CLASS:
+
+ case 0x005a000a:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxSelect();
+ }
+ break;
+ case 0x005a0014:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxCommit();
+ }
+ break;
+ case 0x005a001e:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxRollback();
+ }
+ break;
+
+ default:
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+
+ }
+ }
+ finally
+ {
+ methodProcessor.setCurrentMethod(0, 0);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
deleted file mode 100644
index 49a88b6bc1..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.codec.MarkableDataInput;
-import org.apache.qpid.protocol.AMQConstant;
-
-public class AMQDataBlockDecoder
-{
-
- private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class);
- private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
-
- public AMQDataBlockDecoder()
- {
- }
-
- public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException
- {
- final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
- // type, channel, body length and end byte
- if (remainingAfterAttributes < 0)
- {
- return false;
- }
-
- in.mark(8);
- in.skip(1 + 2);
-
-
- // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
- final long bodySize = in.readInt() & 0xffffffffL;
- if (bodySize > _maxFrameSize)
- {
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
- "Incoming frame size of "
- + bodySize
- + " is larger than negotiated maximum of "
- + _maxFrameSize);
- }
- in.reset();
-
- return (remainingAfterAttributes >= bodySize);
-
- }
-
- public void processInput(MethodProcessor processor,
- MarkableDataInput in)
- throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
- {
- final byte type = in.readByte();
-
- final int channel = in.readUnsignedShort();
- final long bodySize = EncodingUtils.readUnsignedInteger(in);
-
- // bodySize can be zero
- if ((channel < 0) || (bodySize < 0))
- {
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
- "Undecodable frame: type = " + type + " channel = " + channel
- + " bodySize = " + bodySize);
- }
-
- switch (type)
- {
- case 1:
- processMethod(channel, in, processor);
- break;
- case 2:
- ContentHeaderBody.process(channel, in, processor, bodySize);
- break;
- case 3:
- ContentBody.process(channel, in, processor, bodySize);
- break;
- case 8:
- HeartbeatBody.process(channel, in, processor, bodySize);
- break;
- default:
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type);
- }
-
- byte marker = in.readByte();
- if ((marker & 0xFF) != 0xCE)
- {
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
- "End of frame marker not found. Read " + marker + " length=" + bodySize
- + " type=" + type);
- }
-
- }
-
- public void setMaxFrameSize(final int maxFrameSize)
- {
- _maxFrameSize = maxFrameSize;
- }
-
- private void processMethod(int channelId,
- MarkableDataInput in,
- MethodProcessor dispatcher)
- throws AMQFrameDecodingException, IOException
- {
- final int classAndMethod = in.readInt();
- int classId = classAndMethod >> 16;
- int methodId = classAndMethod & 0xFFFF;
- dispatcher.setCurrentMethod(classId, methodId);
- try
- {
- switch (classAndMethod)
- {
- //CONNECTION_CLASS:
- case 0x000a000a:
- ConnectionStartBody.process(in, dispatcher);
- break;
- case 0x000a000b:
- ConnectionStartOkBody.process(in, dispatcher);
- break;
- case 0x000a0014:
- ConnectionSecureBody.process(in, dispatcher);
- break;
- case 0x000a0015:
- ConnectionSecureOkBody.process(in, dispatcher);
- break;
- case 0x000a001e:
- ConnectionTuneBody.process(in, dispatcher);
- break;
- case 0x000a001f:
- ConnectionTuneOkBody.process(in, dispatcher);
- break;
- case 0x000a0028:
- ConnectionOpenBody.process(in, dispatcher);
- break;
- case 0x000a0029:
- ConnectionOpenOkBody.process(in, dispatcher);
- break;
- case 0x000a002a:
- ConnectionRedirectBody.process(in, dispatcher);
- break;
- case 0x000a0032:
- if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
- ConnectionRedirectBody.process(in, dispatcher);
- }
- else
- {
- ConnectionCloseBody.process(in, dispatcher);
- }
- break;
- case 0x000a0033:
- if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
- throw newUnknownMethodException(classId, methodId,
- dispatcher.getProtocolVersion());
- }
- else
- {
- dispatcher.receiveConnectionCloseOk();
- }
- break;
- case 0x000a003c:
- if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
- ConnectionCloseBody.process(in, dispatcher);
- }
- else
- {
- throw newUnknownMethodException(classId, methodId,
- dispatcher.getProtocolVersion());
- }
- break;
- case 0x000a003d:
- if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
- {
- dispatcher.receiveConnectionCloseOk();
- }
- else
- {
- throw newUnknownMethodException(classId, methodId,
- dispatcher.getProtocolVersion());
- }
- break;
-
- // CHANNEL_CLASS:
-
- case 0x0014000a:
- ChannelOpenBody.process(channelId, in, dispatcher);
- break;
- case 0x0014000b:
- ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
- break;
- case 0x00140014:
- ChannelFlowBody.process(channelId, in, dispatcher);
- break;
- case 0x00140015:
- ChannelFlowOkBody.process(channelId, in, dispatcher);
- break;
- case 0x0014001e:
- ChannelAlertBody.process(channelId, in, dispatcher);
- break;
- case 0x00140028:
- ChannelCloseBody.process(channelId, in, dispatcher);
- break;
- case 0x00140029:
- dispatcher.receiveChannelCloseOk(channelId);
- break;
-
- // ACCESS_CLASS:
-
- case 0x001e000a:
- AccessRequestBody.process(channelId, in, dispatcher);
- break;
- case 0x001e000b:
- AccessRequestOkBody.process(channelId, in, dispatcher);
- break;
-
- // EXCHANGE_CLASS:
-
- case 0x0028000a:
- ExchangeDeclareBody.process(channelId, in, dispatcher);
- break;
- case 0x0028000b:
- dispatcher.receiveExchangeDeclareOk(channelId);
- break;
- case 0x00280014:
- ExchangeDeleteBody.process(channelId, in, dispatcher);
- break;
- case 0x00280015:
- dispatcher.receiveExchangeDeleteOk(channelId);
- break;
- case 0x00280016:
- ExchangeBoundBody.process(channelId, in, dispatcher);
- break;
- case 0x00280017:
- ExchangeBoundOkBody.process(channelId, in, dispatcher);
- break;
-
-
- // QUEUE_CLASS:
-
- case 0x0032000a:
- QueueDeclareBody.process(channelId, in, dispatcher);
- break;
- case 0x0032000b:
- QueueDeclareOkBody.process(channelId, in, dispatcher);
- break;
- case 0x00320014:
- QueueBindBody.process(channelId, in, dispatcher);
- break;
- case 0x00320015:
- dispatcher.receiveQueueBindOk(channelId);
- break;
- case 0x0032001e:
- QueuePurgeBody.process(channelId, in, dispatcher);
- break;
- case 0x0032001f:
- QueuePurgeOkBody.process(channelId, in, dispatcher);
- break;
- case 0x00320028:
- QueueDeleteBody.process(channelId, in, dispatcher);
- break;
- case 0x00320029:
- QueueDeleteOkBody.process(channelId, in, dispatcher);
- break;
- case 0x00320032:
- QueueUnbindBody.process(channelId, in, dispatcher);
- break;
- case 0x00320033:
- dispatcher.receiveQueueUnbindOk(channelId);
- break;
-
-
- // BASIC_CLASS:
-
- case 0x003c000a:
- BasicQosBody.process(channelId, in, dispatcher);
- break;
- case 0x003c000b:
- dispatcher.receiveBasicQosOk(channelId);
- break;
- case 0x003c0014:
- BasicConsumeBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0015:
- BasicConsumeOkBody.process(channelId, in, dispatcher);
- break;
- case 0x003c001e:
- BasicCancelBody.process(channelId, in, dispatcher);
- break;
- case 0x003c001f:
- BasicCancelOkBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0028:
- BasicPublishBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0032:
- BasicReturnBody.process(channelId, in, dispatcher);
- break;
- case 0x003c003c:
- BasicDeliverBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0046:
- BasicGetBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0047:
- BasicGetOkBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0048:
- BasicGetEmptyBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0050:
- BasicAckBody.process(channelId, in, dispatcher);
- break;
- case 0x003c005a:
- BasicRejectBody.process(channelId, in, dispatcher);
- break;
- case 0x003c0064:
- BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
- break;
- case 0x003c0065:
- dispatcher.receiveBasicRecoverSyncOk(channelId);
- break;
- case 0x003c0066:
- BasicRecoverSyncBody.process(channelId, in, dispatcher);
- break;
- case 0x003c006e:
- BasicRecoverSyncBody.process(channelId, in, dispatcher);
- break;
- case 0x003c006f:
- dispatcher.receiveBasicRecoverSyncOk(channelId);
- break;
-
- // TX_CLASS:
-
- case 0x005a000a:
- dispatcher.receiveTxSelect(channelId);
- break;
- case 0x005a000b:
- dispatcher.receiveTxSelectOk(channelId);
- break;
- case 0x005a0014:
- dispatcher.receiveTxCommit(channelId);
- break;
- case 0x005a0015:
- dispatcher.receiveTxCommitOk(channelId);
- break;
- case 0x005a001e:
- dispatcher.receiveTxRollback(channelId);
- break;
- case 0x005a001f:
- dispatcher.receiveTxRollbackOk(channelId);
- break;
-
- default:
- throw newUnknownMethodException(classId, methodId,
- dispatcher.getProtocolVersion());
-
- }
- }
- finally
- {
- dispatcher.setCurrentMethod(0,0);
- }
- }
-
- private AMQFrameDecodingException newUnknownMethodException(final int classId,
- final int methodId,
- ProtocolVersion protocolVersion)
- {
- return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID,
- "Method "
- + methodId
- + " unknown in AMQP version "
- + protocolVersion
- + " (while trying to decode class "
- + classId
- + " method "
- + methodId
- + ".");
- }
-
-
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
index ce2a5a1317..8dec50c400 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
@@ -165,9 +165,8 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
AMQShortString realm = buffer.readAMQShortString();
byte bitfield = buffer.readByte();
@@ -176,6 +175,9 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ
boolean active = (bitfield & 0x04) == 0x4 ;
boolean write = (bitfield & 0x08) == 0x8 ;
boolean read = (bitfield & 0x10) == 0x10 ;
- dispatcher.receiveAccessRequest(channelId, realm, exclusive, passive, active, write, read);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveAccessRequest(realm, exclusive, passive, active, write, read);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
index 10be4d45c8..7ed0b3602b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
@@ -95,10 +95,14 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher)
throws IOException
{
int ticket = buffer.readUnsignedShort();
- dispatcher.receiveAccessRequestOk(channelId, ticket);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveAccessRequestOk(ticket);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
index 70e3f10148..68782231fe 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
@@ -112,13 +112,15 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
boolean multiple = (buffer.readByte() & 0x01) != 0;
- dispatcher.receiveBasicAck(channelId, deliveryTag, multiple);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicAck(deliveryTag, multiple);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
index 6f74b3870a..c9a870e2a5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
@@ -113,13 +113,15 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
boolean noWait = (buffer.readByte() & 0x01) == 0x01;
- dispatcher.receiveBasicCancel(channelId, consumerTag, noWait);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicCancel(consumerTag, noWait);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
index 0e9bc52d66..8d16aa44ec 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
@@ -96,10 +96,14 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput in, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput in,
+ final ClientChannelMethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = in.readAMQShortString();
- dispatcher.receiveBasicCancelOk(channelId, consumerTag);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicCancelOk(consumerTag);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
index 94396418fe..502fa07e78 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
@@ -191,7 +191,8 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
@@ -205,6 +206,9 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD
boolean exclusive = (bitfield & 0x04) == 0x04;
boolean nowait = (bitfield & 0x08) == 0x08;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- dispatcher.receiveBasicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicConsume(queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
index d42c722fdf..d3df7f222a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
@@ -96,10 +96,14 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
- dispatcher.receiveBasicConsumeOk(channelId, consumerTag);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicConsumeOk(consumerTag);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
index afa38d1852..f61ee2d55b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
@@ -152,9 +152,8 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher) throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
@@ -162,6 +161,9 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD
boolean redelivered = (buffer.readByte() & 0x01) != 0;
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
- dispatcher.receiveBasicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
index 93429b97d8..68a6f2980b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
@@ -125,13 +125,17 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
throws IOException
{
int ticket = buffer.readUnsignedShort();
AMQShortString queue = buffer.readAMQShortString();
boolean noAck = (buffer.readByte() & 0x01) != 0;
- dispatcher.receiveBasicGet(channelId, queue, noAck);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicGet(queue, noAck);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
index a42df6bcc7..f37fb632db 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
@@ -96,11 +96,13 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher) throws IOException
{
AMQShortString clusterId = buffer.readAMQShortString();
- dispatcher.receiveBasicGetEmpty(channelId);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicGetEmpty();
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
index b8af656a35..37e9bdae5a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
@@ -151,15 +151,17 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
boolean redelivered = (buffer.readByte() & 0x01) != 0;
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
- dispatcher.receiveBasicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicGetOk(deliveryTag, redelivered, exchange, routingKey, messageCount);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
index 910942c2f1..8e5d71a804 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
@@ -151,9 +151,8 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
int ticket = buffer.readUnsignedShort();
@@ -163,6 +162,9 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD
boolean mandatory = (bitfield & 0x01) != 0;
boolean immediate = (bitfield & 0x02) != 0;
- dispatcher.receiveBasicPublish(channelId, exchange, routingKey, mandatory, immediate);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicPublish(exchange, routingKey, mandatory, immediate);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
index fb6b6956c6..6b7e90f41f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
@@ -124,14 +124,16 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
long prefetchSize = EncodingUtils.readUnsignedInteger(buffer);
int prefetchCount = buffer.readUnsignedShort();
boolean global = (buffer.readByte() & 0x01) == 0x01;
- dispatcher.receiveBasicQos(channelId, prefetchSize, prefetchCount, global);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicQos(prefetchSize, prefetchCount, global);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
index 2519f25fbe..e5490c4827 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
@@ -100,14 +100,16 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput in,
- final ProtocolVersion protocolVersion,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput in,
+ final ProtocolVersion protocolVersion,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
boolean requeue = (in.readByte() & 0x01) == 0x01;
boolean sync = (ProtocolVersion.v8_0.equals(protocolVersion));
- dispatcher.receiveBasicRecover(channelId, requeue, sync);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicRecover(requeue, sync);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
index 16c9798977..f82ee78862 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
@@ -103,11 +103,13 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput in,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput in,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
boolean requeue = (in.readByte() & 0x01) == 0x01;
- dispatcher.receiveBasicRecover(channelId, requeue, true);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicRecover(requeue, true);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
index 8e1ebf4013..8c8757f1d2 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
@@ -112,13 +112,15 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
boolean requeue = (buffer.readByte() & 0x01) != 0;
- dispatcher.receiveBasicReject(channelId, deliveryTag, requeue);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicReject(deliveryTag, requeue);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
index cff9914705..afdb343c9f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
@@ -134,15 +134,17 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher) throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
- dispatcher.receiveBasicReturn(channelId, replyCode, replyText, exchange, routingKey);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicReturn(replyCode, replyText, exchange, routingKey);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
index 11dcffc175..289cf2cc10 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
@@ -121,13 +121,17 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
FieldTable details = EncodingUtils.readFieldTable(buffer);
- dispatcher.receiveChannelAlert(channelId, replyCode, replyText, details);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveChannelAlert(replyCode, replyText, details);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
index a4f54fbe7d..a3b92a1fad 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
@@ -132,15 +132,17 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ChannelMethodProcessor dispatcher) throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
int classId = buffer.readUnsignedShort();
int methodId = buffer.readUnsignedShort();
- dispatcher.receiveChannelClose(channelId, replyCode, replyText, classId, methodId);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveChannelClose(replyCode, replyText, classId, methodId);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
index c975744d9f..1c3cc47d4e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
@@ -92,11 +92,13 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ChannelMethodProcessor dispatcher) throws IOException
{
boolean active = (buffer.readByte() & 0x01) == 0x01;
- dispatcher.receiveChannelFlow(channelId, active);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveChannelFlow(active);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
index a62c6155f8..9d4a2b09a1 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
@@ -93,10 +93,14 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ChannelMethodProcessor dispatcher)
throws IOException
{
boolean active = (buffer.readByte() & 0x01) == 0x01;
- dispatcher.receiveChannelFlowOk(channelId, active);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveChannelFlowOk(active);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
new file mode 100644
index 0000000000..84cd1e13c2
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public interface ChannelMethodProcessor
+{
+ void receiveChannelFlow(boolean active);
+
+ void receiveChannelFlowOk(boolean active);
+
+ void receiveChannelClose(int replyCode, AMQShortString replyText, int classId, int methodId);
+
+ void receiveChannelCloseOk();
+
+ void receiveMessageContent(byte[] data);
+
+ void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
+
+ boolean ignoreAllButCloseOk();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
index 9da45d3d70..af583f5fda 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
@@ -84,9 +84,12 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa
public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ final ServerMethodProcessor dispatcher) throws IOException
{
buffer.readAMQShortString();
- dispatcher.receiveChannelOpen(channelId);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveChannelOpen(channelId);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
index 775a08fbd4..e3b4f38a8c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
@@ -96,16 +96,18 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return "[ChannelOpenOkBody]";
}
- public static void process(final int channelId,
- final MarkableDataInput in,
- final ProtocolVersion protocolVersion,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput in,
+ final ProtocolVersion protocolVersion,
+ final ClientChannelMethodProcessor dispatcher) throws IOException
{
if(!ProtocolVersion.v8_0.equals(protocolVersion))
{
EncodingUtils.readBytes(in);
}
- dispatcher.receiveChannelOpenOk(channelId);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveChannelOpenOk();
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
new file mode 100644
index 0000000000..bef143e39b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public interface ClientChannelMethodProcessor extends ChannelMethodProcessor
+{
+ void receiveChannelOpenOk();
+
+ void receiveChannelAlert(int replyCode, final AMQShortString replyText, FieldTable details);
+
+ void receiveAccessRequestOk(int ticket);
+
+ void receiveExchangeDeclareOk();
+
+ void receiveExchangeDeleteOk();
+
+ void receiveExchangeBoundOk(int replyCode, AMQShortString replyText);
+
+ void receiveQueueBindOk();
+
+ void receiveQueueUnbindOk();
+
+ void receiveQueueDeclareOk(final AMQShortString queue, long messageCount, long consumerCount);
+
+ void receiveQueuePurgeOk(long messageCount);
+
+ void receiveQueueDeleteOk(long messageCount);
+
+ void receiveBasicRecoverSyncOk();
+
+ void receiveBasicQosOk();
+
+ void receiveBasicConsumeOk(AMQShortString consumerTag);
+
+ void receiveBasicCancelOk(AMQShortString consumerTag);
+
+ void receiveBasicReturn(int replyCode,
+ AMQShortString replyText,
+ AMQShortString exchange,
+ AMQShortString routingKey);
+
+ void receiveBasicDeliver(AMQShortString consumerTag,
+ long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange, AMQShortString routingKey);
+
+ void receiveBasicGetOk(long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey, long messageCount);
+
+ void receiveBasicGetEmpty();
+
+ void receiveTxSelectOk();
+
+ void receiveTxCommitOk();
+
+ void receiveTxRollbackOk();
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java
new file mode 100644
index 0000000000..0b599ee40a
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public interface ClientMethodProcessor<T extends ClientChannelMethodProcessor> extends MethodProcessor<T>
+{
+ void receiveConnectionStart(short versionMajor,
+ short versionMinor,
+ FieldTable serverProperties,
+ byte[] mechanisms,
+ byte[] locales);
+
+ void receiveConnectionSecure(byte[] challenge);
+
+ void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts);
+
+ void receiveConnectionTune(int channelMax, long frameMax, int heartbeat);
+
+ void receiveConnectionOpenOk(AMQShortString knownHosts);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
index 0e685deb7c..7fb815ae40 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
@@ -121,12 +121,15 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final ServerMethodProcessor dispatcher) throws IOException
{
AMQShortString virtualHost = buffer.readAMQShortString();
AMQShortString capabilities = buffer.readAMQShortString();
boolean insist = (buffer.readByte() & 0x01) == 0x01;
- dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
index 6d1e80c624..95c48873f3 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
@@ -96,10 +96,13 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final ClientMethodProcessor dispatcher) throws IOException
{
AMQShortString knownHosts = buffer.readAMQShortString();
- dispatcher.receiveConnectionOpenOk(knownHosts);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConnectionOpenOk(knownHosts);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
index a9b9a43b1a..491cc25125 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
@@ -108,10 +108,13 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab
return buf.toString();
}
- public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final ClientMethodProcessor dispatcher) throws IOException
{
AMQShortString host = buffer.readAMQShortString();
AMQShortString knownHosts = buffer.readAMQShortString();
- dispatcher.receiveConnectionRedirect(host, knownHosts);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConnectionRedirect(host, knownHosts);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
index 1f7f2b0221..e10af3b4c1 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
@@ -96,11 +96,14 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static void process(final MarkableDataInput in, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput in, final ClientMethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
byte[] challenge = EncodingUtils.readBytes(in);
- dispatcher.receiveConnectionSecure(challenge);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConnectionSecure(challenge);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
index 9a4668a9c7..4c4a249bb6 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
@@ -96,9 +96,12 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab
return buf.toString();
}
- public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput in, final ServerMethodProcessor dispatcher) throws IOException
{
byte[] response = EncodingUtils.readBytes(in);
- dispatcher.receiveConnectionSecureOk(response);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConnectionSecureOk(response);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
index 4f47f0632f..3b94919d4e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
@@ -136,7 +136,7 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static void process(final MarkableDataInput in, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput in, final ClientMethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
short versionMajor = (short) in.readUnsignedByte();
@@ -145,7 +145,9 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA
byte[] mechanisms = EncodingUtils.readBytes(in);
byte[] locales = EncodingUtils.readBytes(in);
-
- dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
index da3d0a2c56..5b6a8e3ef7 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
@@ -126,7 +126,7 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl
return buf.toString();
}
- public static void process(final MarkableDataInput in, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput in, final ServerMethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
@@ -134,7 +134,9 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl
AMQShortString mechanism = in.readAMQShortString();
byte[] response = EncodingUtils.readBytes(in);
AMQShortString locale = in.readAMQShortString();
-
- dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
index 3383fd889a..04def21d44 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
@@ -119,12 +119,15 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final ClientMethodProcessor dispatcher) throws IOException
{
int channelMax = buffer.readUnsignedShort();
long frameMax = EncodingUtils.readUnsignedInteger(buffer);
int heartbeat = buffer.readUnsignedShort();
- dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
index f695eda2c4..3141a85766 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
@@ -119,12 +119,15 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final ServerMethodProcessor dispatcher) throws IOException
{
int channelMax = buffer.readUnsignedShort();
long frameMax = EncodingUtils.readUnsignedInteger(buffer);
int heartbeat = buffer.readUnsignedShort();
- dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index 01beb3af77..4d9826d83c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -21,7 +21,6 @@
package org.apache.qpid.framing;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -73,33 +72,20 @@ public class ContentBody implements AMQBody
session.contentBodyReceived(channelId, this);
}
- protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
- {
- if (size > 0)
- {
- _payload = new byte[(int)size];
- buffer.read(getPayload());
- }
-
- }
-
- public void reduceBufferToFit()
- {
- }
-
public byte[] getPayload()
{
return _payload;
}
- public static void process(final int channel,
- final MarkableDataInput in,
- final MethodProcessor methodProcessor, final long bodySize)
+ public static void process(final MarkableDataInput in,
+ final ChannelMethodProcessor methodProcessor, final long bodySize)
throws IOException
{
+
byte[] payload = new byte[(int)bodySize];
in.readFully(payload);
- methodProcessor.receiveMessageContent(channel, payload);
+
+ methodProcessor.receiveMessageContent(payload);
}
private static class BufferContentBody implements AMQBody
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 0d54e09ae5..0d25e4dfba 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -155,9 +155,8 @@ public class ContentHeaderBody implements AMQBody
_bodySize = bodySize;
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor methodProcessor, final long size)
+ public static void process(final MarkableDataInput buffer,
+ final ChannelMethodProcessor methodProcessor, final long size)
throws IOException, AMQFrameDecodingException
{
@@ -168,13 +167,13 @@ public class ContentHeaderBody implements AMQBody
BasicContentHeaderProperties properties;
- if (classId != BasicConsumeBody.CLASS_ID)
+ if (classId != CLASS_ID)
{
throw new AMQFrameDecodingException(null, "Unsupported content header class id: " + classId, null);
}
- properties = new BasicContentHeaderProperties();
+ properties = new BasicContentHeaderProperties();
properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14));
- methodProcessor.receiveMessageHeader(channelId, properties, bodySize);
+ methodProcessor.receiveMessageHeader(properties, bodySize);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
index 7548db6e93..e8dc2ae442 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
@@ -122,13 +122,17 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
throws IOException
{
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
AMQShortString queue = buffer.readAMQShortString();
- dispatcher.receiveExchangeBound(channelId, exchange, routingKey, queue);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveExchangeBound(exchange, routingKey, queue);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
index 6b02b066ae..ef91c1d635 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
@@ -115,12 +115,16 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher)
throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
- dispatcher.receiveExchangeBoundOk(channelId, replyCode, replyText);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveExchangeBoundOk(replyCode, replyText);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
index 06e590f8e5..4001ba7aa0 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
@@ -204,9 +204,8 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -219,14 +218,16 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA
boolean internal = (bitfield & 0x8) == 0x8;
boolean nowait = (bitfield & 0x10) == 0x10;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- dispatcher.receiveExchangeDeclare(channelId,
- exchange,
- type,
- passive,
- durable,
- autoDelete,
- internal,
- nowait,
- arguments);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveExchangeDeclare(exchange,
+ type,
+ passive,
+ durable,
+ autoDelete,
+ internal,
+ nowait,
+ arguments);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
index 4a30e25502..f4646315cd 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
@@ -138,7 +138,8 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
throws IOException
{
@@ -147,6 +148,9 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM
byte bitfield = buffer.readByte();
boolean ifUnused = (bitfield & 0x01) == 0x01;
boolean nowait = (bitfield & 0x02) == 0x02;
- dispatcher.receiveExchangeDelete(channelId, exchange, ifUnused, nowait);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveExchangeDelete(exchange, ifUnused, nowait);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
index 1ad0f3081b..19b091a359 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
@@ -23,7 +23,9 @@ package org.apache.qpid.framing;
import java.util.ArrayList;
import java.util.List;
-public class FrameCreatingMethodProcessor implements MethodProcessor
+public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>,
+ ClientMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>,
+ ServerMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>
{
private ProtocolVersion _protocolVersion;
@@ -61,42 +63,6 @@ public class FrameCreatingMethodProcessor implements MethodProcessor
}
@Override
- public void receiveTxSelect(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE));
- }
-
- @Override
- public void receiveTxSelectOk(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE));
- }
-
- @Override
- public void receiveTxCommit(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE));
- }
-
- @Override
- public void receiveTxCommitOk(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE));
- }
-
- @Override
- public void receiveTxRollback(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE));
- }
-
- @Override
- public void receiveTxRollbackOk(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE));
- }
-
- @Override
public void receiveConnectionSecure(final byte[] challenge)
{
_processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge)));
@@ -163,382 +129,483 @@ public class FrameCreatingMethodProcessor implements MethodProcessor
_processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody()));
}
- @Override
- public void receiveChannelOpenOk(final int channelId)
+ private void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
{
- _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
- ? ChannelOpenOkBody.INSTANCE_0_8
- : ChannelOpenOkBody.INSTANCE_0_9));
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)));
}
@Override
- public void receiveChannelFlow(final int channelId, final boolean active)
+ public void receiveHeartbeat()
{
- _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active)));
+ _processedMethods.add(new AMQFrame(0, new HeartbeatBody()));
}
@Override
- public void receiveChannelFlowOk(final int channelId, final boolean active)
+ public ProtocolVersion getProtocolVersion()
{
- _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active)));
+ return _protocolVersion;
}
@Override
- public void receiveChannelAlert(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final FieldTable details)
+ public ClientAndServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
{
- _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)));
+ return new FrameCreatingChannelMethodProcessor(channelId);
}
- @Override
- public void receiveChannelClose(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final int classId,
- final int methodId)
+ public void setProtocolVersion(final ProtocolVersion protocolVersion)
{
- _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)));
+ _protocolVersion = protocolVersion;
}
@Override
- public void receiveChannelCloseOk(final int channelId)
+ public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
{
- _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE));
+ _processedMethods.add(protocolInitiation);
}
@Override
- public void receiveAccessRequest(final int channelId,
- final AMQShortString realm,
- final boolean exclusive,
- final boolean passive,
- final boolean active,
- final boolean write,
- final boolean read)
+ public void setCurrentMethod(final int classId, final int methodId)
{
- _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)));
+ _classId = classId;
+ _methodId = methodId;
}
@Override
- public void receiveAccessRequestOk(final int channelId, final int ticket)
+ public boolean ignoreAllButCloseOk()
{
- _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket)));
+ return false;
}
- @Override
- public void receiveExchangeDeclare(final int channelId,
- final AMQShortString exchange,
- final AMQShortString type,
- final boolean passive,
- final boolean durable,
- final boolean autoDelete,
- final boolean internal,
- final boolean nowait, final FieldTable arguments)
+ public int getClassId()
{
- _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments)));
+ return _classId;
}
- @Override
- public void receiveExchangeDeclareOk(final int channelId)
+ public int getMethodId()
{
- _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody()));
+ return _methodId;
}
- @Override
- public void receiveExchangeDelete(final int channelId,
- final AMQShortString exchange,
- final boolean ifUnused,
- final boolean nowait)
+ public static interface ClientAndServerChannelMethodProcessor extends ServerChannelMethodProcessor, ClientChannelMethodProcessor
{
- _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)));
- }
- @Override
- public void receiveExchangeDeleteOk(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody()));
}
- @Override
- public void receiveExchangeBound(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final AMQShortString queue)
+ private class FrameCreatingChannelMethodProcessor implements ClientAndServerChannelMethodProcessor
{
- _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)));
- }
+ private final int _channelId;
- @Override
- public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
- {
- _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)));
- }
+ private FrameCreatingChannelMethodProcessor(final int channelId)
+ {
+ _channelId = channelId;
+ }
- @Override
- public void receiveQueueBindOk(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody()));
- }
- @Override
- public void receiveQueueUnbindOk(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody()));
- }
+ @Override
+ public void receiveChannelOpenOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
+ ? ChannelOpenOkBody.INSTANCE_0_8
+ : ChannelOpenOkBody.INSTANCE_0_9));
+ }
- @Override
- public void receiveQueueDeclare(final int channelId,
- final AMQShortString queue,
- final boolean passive,
- final boolean durable,
- final boolean exclusive,
- final boolean autoDelete,
- final boolean nowait,
- final FieldTable arguments)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments)));
- }
+ @Override
+ public void receiveChannelAlert(final int replyCode, final AMQShortString replyText, final FieldTable details)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ChannelAlertBody(replyCode, replyText, details)));
+ }
- @Override
- public void receiveQueueDeclareOk(final int channelId,
- final AMQShortString queue,
- final long messageCount,
- final long consumerCount)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)));
- }
+ @Override
+ public void receiveAccessRequestOk(final int ticket)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new AccessRequestOkBody(ticket)));
+ }
- @Override
- public void receiveQueueBind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final boolean nowait,
- final FieldTable arguments)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)));
- }
+ @Override
+ public void receiveExchangeDeclareOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareOkBody()));
+ }
- @Override
- public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)));
- }
+ @Override
+ public void receiveExchangeDeleteOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteOkBody()));
+ }
- @Override
- public void receiveQueuePurgeOk(final int channelId, final long messageCount)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)));
- }
+ @Override
+ public void receiveExchangeBoundOk(final int replyCode, final AMQShortString replyText)
+ {
+ FrameCreatingMethodProcessor.this.receiveExchangeBoundOk(_channelId, replyCode, replyText);
+ }
- @Override
- public void receiveQueueDelete(final int channelId,
- final AMQShortString queue,
- final boolean ifUnused,
- final boolean ifEmpty,
- final boolean nowait)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)));
- }
+ @Override
+ public void receiveQueueBindOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new QueueBindOkBody()));
+ }
- @Override
- public void receiveQueueDeleteOk(final int channelId, final long messageCount)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)));
- }
+ @Override
+ public void receiveQueueUnbindOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindOkBody()));
+ }
- @Override
- public void receiveQueueUnbind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final FieldTable arguments)
- {
- _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)));
- }
+ @Override
+ public void receiveQueueDeclareOk(final AMQShortString queue, final long messageCount, final long consumerCount)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)));
+ }
- @Override
- public void receiveBasicRecoverSyncOk(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())));
- }
+ @Override
+ public void receiveQueuePurgeOk(final long messageCount)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeOkBody(messageCount)));
+ }
- @Override
- public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync)
- {
- if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
+ @Override
+ public void receiveQueueDeleteOk(final long messageCount)
{
- _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue)));
+ _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteOkBody(messageCount)));
}
- else
+
+ @Override
+ public void receiveBasicRecoverSyncOk()
{
- _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)));
+ _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncOkBody(getProtocolVersion())));
}
- }
- @Override
- public void receiveBasicQos(final int channelId,
- final long prefetchSize,
- final int prefetchCount,
- final boolean global)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)));
- }
+ @Override
+ public void receiveBasicQosOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicQosOkBody()));
+ }
- @Override
- public void receiveBasicQosOk(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody()));
- }
+ @Override
+ public void receiveBasicConsumeOk(final AMQShortString consumerTag)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeOkBody(consumerTag)));
+ }
- @Override
- public void receiveBasicConsume(final int channelId,
- final AMQShortString queue,
- final AMQShortString consumerTag,
- final boolean noLocal,
- final boolean noAck,
- final boolean exclusive,
- final boolean nowait,
- final FieldTable arguments)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments)));
- }
+ @Override
+ public void receiveBasicCancelOk(final AMQShortString consumerTag)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicCancelOkBody(consumerTag)));
+ }
- @Override
- public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
- }
+ @Override
+ public void receiveBasicReturn(final int replyCode,
+ final AMQShortString replyText,
+ final AMQShortString exchange,
+ final AMQShortString routingKey)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicReturnBody(replyCode,
+ replyText,
+ exchange,
+ routingKey)));
+ }
- @Override
- public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)));
- }
+ @Override
+ public void receiveBasicDeliver(final AMQShortString consumerTag,
+ final long deliveryTag,
+ final boolean redelivered,
+ final AMQShortString exchange,
+ final AMQShortString routingKey)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicDeliverBody(consumerTag,
+ deliveryTag,
+ redelivered,
+ exchange,
+ routingKey)));
+ }
- @Override
- public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)));
- }
+ @Override
+ public void receiveBasicGetOk(final long deliveryTag,
+ final boolean redelivered,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final long messageCount)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicGetOkBody(deliveryTag,
+ redelivered,
+ exchange,
+ routingKey,
+ messageCount)));
+ }
- @Override
- public void receiveBasicPublish(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final boolean mandatory,
- final boolean immediate)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)));
- }
+ @Override
+ public void receiveBasicGetEmpty()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicGetEmptyBody((AMQShortString)null)));
+ }
- @Override
- public void receiveBasicReturn(final int channelId, final int replyCode,
- final AMQShortString replyText,
- final AMQShortString exchange,
- final AMQShortString routingKey)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)));
- }
+ @Override
+ public void receiveTxSelectOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, TxSelectOkBody.INSTANCE));
+ }
- @Override
- public void receiveBasicDeliver(final int channelId,
- final AMQShortString consumerTag,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
- }
+ @Override
+ public void receiveTxCommitOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, TxCommitOkBody.INSTANCE));
+ }
- @Override
- public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)));
- }
+ @Override
+ public void receiveTxRollbackOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, TxRollbackOkBody.INSTANCE));
+ }
- @Override
- public void receiveBasicGetOk(final int channelId,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final long messageCount)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
- }
+ @Override
+ public void receiveAccessRequest(final AMQShortString realm,
+ final boolean exclusive,
+ final boolean passive,
+ final boolean active,
+ final boolean write,
+ final boolean read)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new AccessRequestBody(realm,
+ exclusive,
+ passive,
+ active,
+ write,
+ read)));
+ }
- @Override
- public void receiveBasicGetEmpty(final int channelId)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null)));
- }
+ @Override
+ public void receiveExchangeDeclare(final AMQShortString exchange,
+ final AMQShortString type,
+ final boolean passive,
+ final boolean durable,
+ final boolean autoDelete,
+ final boolean internal,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareBody(0,
+ exchange,
+ type,
+ passive,
+ durable,
+ autoDelete,
+ internal,
+ nowait,
+ arguments)));
+ }
- @Override
- public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)));
- }
+ @Override
+ public void receiveExchangeDelete(final AMQShortString exchange, final boolean ifUnused, final boolean nowait)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)));
+ }
- @Override
- public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue)
- {
- _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)));
- }
+ @Override
+ public void receiveExchangeBound(final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final AMQShortString queue)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ExchangeBoundBody(exchange, routingKey, queue)));
+ }
- @Override
- public void receiveHeartbeat()
- {
- _processedMethods.add(new AMQFrame(0, new HeartbeatBody()));
- }
+ @Override
+ public void receiveQueueDeclare(final AMQShortString queue,
+ final boolean passive,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareBody(0,
+ queue,
+ passive,
+ durable,
+ exclusive,
+ autoDelete,
+ nowait,
+ arguments)));
+ }
- @Override
- public ProtocolVersion getProtocolVersion()
- {
- return _protocolVersion;
- }
+ @Override
+ public void receiveQueueBind(final AMQShortString queue,
+ final AMQShortString exchange,
+ final AMQShortString bindingKey,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new QueueBindBody(0,
+ queue,
+ exchange,
+ bindingKey,
+ nowait,
+ arguments)));
+ }
- public void setProtocolVersion(final ProtocolVersion protocolVersion)
- {
- _protocolVersion = protocolVersion;
- }
+ @Override
+ public void receiveQueuePurge(final AMQShortString queue, final boolean nowait)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeBody(0, queue, nowait)));
+ }
- @Override
- public void receiveMessageContent(final int channelId, final byte[] data)
- {
- _processedMethods.add(new AMQFrame(channelId, new ContentBody(data)));
- }
+ @Override
+ public void receiveQueueDelete(final AMQShortString queue,
+ final boolean ifUnused,
+ final boolean ifEmpty,
+ final boolean nowait)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)));
+ }
- @Override
- public void receiveMessageHeader(final int channelId,
- final BasicContentHeaderProperties properties,
- final long bodySize)
- {
- _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)));
- }
+ @Override
+ public void receiveQueueUnbind(final AMQShortString queue,
+ final AMQShortString exchange,
+ final AMQShortString bindingKey,
+ final FieldTable arguments)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindBody(0,
+ queue,
+ exchange,
+ bindingKey,
+ arguments)));
+ }
- @Override
- public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
- {
- _processedMethods.add(protocolInitiation);
- }
+ @Override
+ public void receiveBasicRecover(final boolean requeue, final boolean sync)
+ {
+ if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverBody(requeue)));
+ }
+ else
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)));
+ }
+ }
- @Override
- public void setCurrentMethod(final int classId, final int methodId)
- {
- _classId = classId;
- _methodId = methodId;
- }
+ @Override
+ public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicQosBody(prefetchSize, prefetchCount, global)));
+ }
- public int getClassId()
- {
- return _classId;
- }
+ @Override
+ public void receiveBasicConsume(final AMQShortString queue,
+ final AMQShortString consumerTag,
+ final boolean noLocal,
+ final boolean noAck,
+ final boolean exclusive,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeBody(0,
+ queue,
+ consumerTag,
+ noLocal,
+ noAck,
+ exclusive,
+ nowait,
+ arguments)));
+ }
- public int getMethodId()
- {
- return _methodId;
+ @Override
+ public void receiveBasicCancel(final AMQShortString consumerTag, final boolean noWait)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicCancelBody(consumerTag, noWait)));
+ }
+
+ @Override
+ public void receiveBasicPublish(final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final boolean mandatory,
+ final boolean immediate)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicPublishBody(0,
+ exchange,
+ routingKey,
+ mandatory,
+ immediate)));
+ }
+
+ @Override
+ public void receiveBasicGet(final AMQShortString queue, final boolean noAck)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicGetBody(0, queue, noAck)));
+ }
+
+ @Override
+ public void receiveBasicAck(final long deliveryTag, final boolean multiple)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicAckBody(deliveryTag, multiple)));
+ }
+
+ @Override
+ public void receiveBasicReject(final long deliveryTag, final boolean requeue)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicRejectBody(deliveryTag, requeue)));
+ }
+
+ @Override
+ public void receiveTxSelect()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, TxSelectBody.INSTANCE));
+ }
+
+ @Override
+ public void receiveTxCommit()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, TxCommitBody.INSTANCE));
+ }
+
+ @Override
+ public void receiveTxRollback()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, TxRollbackBody.INSTANCE));
+ }
+
+ @Override
+ public void receiveChannelFlow(final boolean active)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowBody(active)));
+ }
+
+ @Override
+ public void receiveChannelFlowOk(final boolean active)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowOkBody(active)));
+ }
+
+ @Override
+ public void receiveChannelClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)));
+ }
+
+ @Override
+ public void receiveChannelCloseOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, ChannelCloseOkBody.INSTANCE));
+ }
+
+ @Override
+ public void receiveMessageContent(final byte[] data)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ContentBody(data)));
+ }
+
+ @Override
+ public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ContentHeaderBody(properties, bodySize)));
+ }
+
+ @Override
+ public boolean ignoreAllButCloseOk()
+ {
+ return false;
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
index 0b08059631..62c0cd3c6d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
@@ -20,184 +20,21 @@
*/
package org.apache.qpid.framing;
-public interface MethodProcessor
+public interface MethodProcessor<T extends ChannelMethodProcessor>
{
ProtocolVersion getProtocolVersion();
- void receiveConnectionStart(short versionMajor,
- short versionMinor,
- FieldTable serverProperties,
- byte[] mechanisms,
- byte[] locales);
-
- void receiveConnectionStartOk(FieldTable clientProperties,
- AMQShortString mechanism,
- byte[] response,
- AMQShortString locale);
-
- void receiveTxSelect(int channelId);
-
- void receiveTxSelectOk(int channelId);
-
- void receiveTxCommit(int channelId);
-
- void receiveTxCommitOk(int channelId);
-
- void receiveTxRollback(int channelId);
-
- void receiveTxRollbackOk(int channelId);
-
- void receiveConnectionSecure(byte[] challenge);
-
- void receiveConnectionSecureOk(byte[] response);
-
- void receiveConnectionTune(int channelMax, long frameMax, int heartbeat);
-
- void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat);
-
- void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
-
- void receiveConnectionOpenOk(AMQShortString knownHosts);
-
- void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts);
+ T getChannelMethodProcessor(int channelId);
void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
void receiveConnectionCloseOk();
- void receiveChannelOpen(int channelId);
-
- void receiveChannelOpenOk(int channelId);
-
- void receiveChannelFlow(int channelId, boolean active);
-
- void receiveChannelFlowOk(int channelId, boolean active);
-
- void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details);
-
- void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId);
-
- void receiveChannelCloseOk(int channelId);
-
- void receiveAccessRequest(int channelId,
- AMQShortString realm,
- boolean exclusive,
- boolean passive,
- boolean active,
- boolean write, boolean read);
-
- void receiveAccessRequestOk(int channelId, int ticket);
-
- void receiveExchangeDeclare(int channelId,
- AMQShortString exchange,
- AMQShortString type,
- boolean passive,
- boolean durable,
- boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
-
- void receiveExchangeDeclareOk(int channelId);
-
- void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait);
-
- void receiveExchangeDeleteOk(int channelId);
-
- void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
-
- void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText);
-
- void receiveQueueBindOk(int channelId);
-
- void receiveQueueUnbindOk(final int channelId);
-
- void receiveQueueDeclare(int channelId,
- AMQShortString queue,
- boolean passive,
- boolean durable,
- boolean exclusive,
- boolean autoDelete, boolean nowait, FieldTable arguments);
-
- void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount);
-
- void receiveQueueBind(int channelId,
- AMQShortString queue,
- AMQShortString exchange,
- AMQShortString bindingKey,
- boolean nowait, FieldTable arguments);
-
- void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait);
-
- void receiveQueuePurgeOk(int channelId, long messageCount);
-
- void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
-
- void receiveQueueDeleteOk(int channelId, long messageCount);
-
- void receiveQueueUnbind(int channelId,
- AMQShortString queue,
- AMQShortString exchange,
- AMQShortString bindingKey,
- FieldTable arguments);
-
- void receiveBasicRecoverSyncOk(int channelId);
-
- void receiveBasicRecover(int channelId, final boolean requeue, boolean sync);
-
- void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global);
-
- void receiveBasicQosOk(int channelId);
-
- void receiveBasicConsume(int channelId,
- AMQShortString queue,
- AMQShortString consumerTag,
- boolean noLocal,
- boolean noAck,
- boolean exclusive, boolean nowait, FieldTable arguments);
-
- void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag);
-
- void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait);
-
- void receiveBasicCancelOk(int channelId, AMQShortString consumerTag);
-
- void receiveBasicPublish(int channelId,
- AMQShortString exchange,
- AMQShortString routingKey,
- boolean mandatory,
- boolean immediate);
-
- void receiveBasicReturn(final int channelId,
- int replyCode,
- AMQShortString replyText,
- AMQShortString exchange,
- AMQShortString routingKey);
-
- void receiveBasicDeliver(int channelId,
- AMQShortString consumerTag,
- long deliveryTag,
- boolean redelivered,
- AMQShortString exchange, AMQShortString routingKey);
-
- void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck);
-
- void receiveBasicGetOk(int channelId,
- long deliveryTag,
- boolean redelivered,
- AMQShortString exchange,
- AMQShortString routingKey, long messageCount);
-
- void receiveBasicGetEmpty(int channelId);
-
- void receiveBasicAck(int channelId, long deliveryTag, boolean multiple);
-
- void receiveBasicReject(int channelId, long deliveryTag, boolean requeue);
-
void receiveHeartbeat();
- void receiveMessageContent(int channelId, byte[] data);
-
- void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
-
void receiveProtocolHeader(ProtocolInitiation protocolInitiation);
void setCurrentMethod(int classId, int methodId);
+
+ boolean ignoreAllButCloseOk();
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
index e4419f77e3..2b7e26a7f0 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
@@ -165,9 +165,8 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -176,6 +175,9 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData
AMQShortString bindingKey = buffer.readAMQShortString();
boolean nowait = (buffer.readByte() & 0x01) == 0x01;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveQueueBind(queue, exchange, bindingKey, nowait, arguments);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
index 1f9888c76a..5a359dc8df 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
@@ -191,9 +191,8 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -206,6 +205,9 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD
boolean autoDelete = (bitfield & 0x08 ) == 0x08;
boolean nowait = (bitfield & 0x010 ) == 0x010;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveQueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
index 9857bb3a39..cf6fc656b3 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
@@ -120,13 +120,15 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher) throws IOException
{
AMQShortString queue = buffer.readAMQShortString();
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
long consumerCount = EncodingUtils.readUnsignedInteger(buffer);
- dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveQueueDeclareOk(queue, messageCount, consumerCount);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
index 408f9f9667..ea933dc644 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
@@ -151,9 +151,8 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
int ticket = buffer.readUnsignedShort();
@@ -163,6 +162,9 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa
boolean ifUnused = (bitfield & 0x01) == 0x01;
boolean ifEmpty = (bitfield & 0x02) == 0x02;
boolean nowait = (bitfield & 0x04) == 0x04;
- dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveQueueDelete(queue, ifUnused, ifEmpty, nowait);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
index b43369b68a..6d50153c15 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
@@ -95,11 +95,13 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher) throws IOException
{
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
- dispatcher.receiveQueueDeleteOk(channelId, messageCount);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveQueueDeleteOk(messageCount);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
index 5a04e21355..58a424387c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
@@ -125,14 +125,16 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException
{
int ticket = buffer.readUnsignedShort();
AMQShortString queue = buffer.readAMQShortString();
boolean nowait = (buffer.readByte() & 0x01) == 0x01;
- dispatcher.receiveQueuePurge(channelId, queue, nowait);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveQueuePurge(queue, nowait);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
index 40cac8b390..acab2bc052 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
@@ -95,11 +95,13 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher) throws IOException
{
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
- dispatcher.receiveQueuePurgeOk(channelId, messageCount);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveQueuePurgeOk(messageCount);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
index a6f3e5b4c5..30c5d19d27 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
@@ -147,9 +147,8 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -157,6 +156,9 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveQueueUnbind(queue, exchange, routingKey, arguments);
+ }
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
index d4c7f151e7..89b75c2d2f 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
@@ -18,92 +18,75 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol.v0_8;
+package org.apache.qpid.framing;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-public interface ChannelMethodProcessor
+public interface ServerChannelMethodProcessor extends ChannelMethodProcessor
{
void receiveAccessRequest(AMQShortString realm,
boolean exclusive,
boolean passive,
boolean active,
- boolean write,
- boolean read);
-
- void receiveBasicAck(long deliveryTag, boolean multiple);
-
- void receiveBasicCancel(AMQShortString consumerTag, boolean nowait);
-
- void receiveBasicConsume(AMQShortString queue,
- AMQShortString consumerTag,
- boolean noLocal,
- boolean noAck,
- boolean exclusive,
- boolean nowait,
- FieldTable arguments);
-
- void receiveBasicGet(AMQShortString queue, boolean noAck);
-
- void receiveBasicPublish(AMQShortString exchange,
- AMQShortString routingKey,
- boolean mandatory,
- boolean immediate);
-
- void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global);
-
- void receiveBasicRecover(boolean requeue, boolean sync);
-
- void receiveBasicReject(long deliveryTag, boolean requeue);
-
- void receiveChannelClose();
-
- void receiveChannelCloseOk();
-
- void receiveChannelFlow(boolean active);
-
- void receiveExchangeBound(AMQShortString exchange, AMQShortString queue, AMQShortString routingKey);
+ boolean write, boolean read);
void receiveExchangeDeclare(AMQShortString exchange,
AMQShortString type,
boolean passive,
boolean durable,
- boolean autoDelete,
- boolean internal,
- boolean nowait,
- FieldTable arguments);
+ boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait);
- void receiveQueueBind(AMQShortString queue,
- AMQShortString exchange,
- AMQShortString routingKey,
- boolean nowait,
- FieldTable arguments);
+ void receiveExchangeBound(AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
- void receiveQueueDeclare(AMQShortString queueStr,
+ void receiveQueueDeclare(AMQShortString queue,
boolean passive,
boolean durable,
boolean exclusive,
- boolean autoDelete,
- boolean nowait,
- FieldTable arguments);
+ boolean autoDelete, boolean nowait, FieldTable arguments);
- void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
+ void receiveQueueBind(AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString bindingKey,
+ boolean nowait, FieldTable arguments);
void receiveQueuePurge(AMQShortString queue, boolean nowait);
+ void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
+
void receiveQueueUnbind(AMQShortString queue,
AMQShortString exchange,
- AMQShortString routingKey,
+ AMQShortString bindingKey,
FieldTable arguments);
+ void receiveBasicRecover(final boolean requeue, boolean sync);
+
+ void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global);
+
+ void receiveBasicConsume(AMQShortString queue,
+ AMQShortString consumerTag,
+ boolean noLocal,
+ boolean noAck,
+ boolean exclusive, boolean nowait, FieldTable arguments);
+
+ void receiveBasicCancel(AMQShortString consumerTag, boolean noWait);
+
+ void receiveBasicPublish(AMQShortString exchange,
+ AMQShortString routingKey,
+ boolean mandatory,
+ boolean immediate);
+
+ void receiveBasicGet(AMQShortString queue, boolean noAck);
+
+ void receiveBasicAck(long deliveryTag, boolean multiple);
+
+ void receiveBasicReject(long deliveryTag, boolean requeue);
+
+
+
void receiveTxSelect();
void receiveTxCommit();
void receiveTxRollback();
-
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java
index 6e657c022e..77b4a1fc6b 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java
@@ -18,27 +18,22 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol.v0_8;
+package org.apache.qpid.framing;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-public interface ConnectionMethodProcessor
+public interface ServerMethodProcessor<T extends ServerChannelMethodProcessor> extends MethodProcessor<T>
{
- void receiveChannelOpen(int channelId);
-
- void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
-
- void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
-
- void receiveConnectionCloseOk();
-
- void receiveConnectionSecureOk(byte[] response);
-
void receiveConnectionStartOk(FieldTable clientProperties,
AMQShortString mechanism,
byte[] response,
AMQShortString locale);
+ void receiveConnectionSecureOk(byte[] response);
+
void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat);
+
+ void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
+
+ void receiveChannelOpen(int channelId);
+
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
index bd3e9bbcbc..61d5f0629c 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.transport.util;
-import java.nio.ByteBuffer;
-
import static java.lang.Math.min;
+import java.nio.ByteBuffer;
+
/**
* Functions
@@ -33,6 +33,9 @@ import static java.lang.Math.min;
public final class Functions
{
+ private static final char[] HEX_CHARACTERS =
+ {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+
private Functions()
{
}
@@ -102,4 +105,21 @@ public final class Functions
return str(ByteBuffer.wrap(bytes), limit);
}
+ public static String hex(byte[] bytes, int limit)
+ {
+ limit = Math.min(limit, bytes == null ? 0 : bytes.length);
+ StringBuilder sb = new StringBuilder(3 + limit*2);
+ for(int i = 0; i < limit; i++)
+ {
+ sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0xf0)>>4]);
+ sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0x0f)]);
+
+ }
+ if(bytes != null && bytes.length>limit)
+ {
+ sb.append("...");
+ }
+ return sb.toString();
+ }
+
}
diff --git a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
index 63696515c6..51f3ce1113 100644
--- a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
+++ b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
@@ -47,7 +47,7 @@ public class AMQDecoderTest extends TestCase
public void setUp()
{
_methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
- _decoder = new AMQDecoder(false, _methodProcessor);
+ _decoder = new ClientDecoder(_methodProcessor);
}
diff --git a/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
index b4a8155978..f76203887c 100644
--- a/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
@@ -40,13 +41,13 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.ClientDecoder;
import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQDataBlockDecoder;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ByteArrayDataInput;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
@@ -234,14 +235,9 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
}
byte[] serverData = baos.toByteArray();
- ByteArrayDataInput badi = new ByteArrayDataInput(serverData);
- AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder();
final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
-
- while (datablockDecoder.decodable(badi))
- {
- datablockDecoder.processInput(methodProcessor, badi);
- }
+ AMQDecoder decoder = new ClientDecoder(methodProcessor);
+ decoder.decodeBuffer(ByteBuffer.wrap(serverData));
evaluator.evaluate(socket, methodProcessor.getProcessedMethods());
}