diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-13 00:58:45 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-13 00:58:45 +0000 |
commit | 1e437d92f66da4ef0dffbfb85e9e66e5b4f4f980 (patch) | |
tree | fc7be07855ef97588f8af0bbe53d79107a9d5544 | |
parent | b71808f6e2d65056b3cded958012ad1d96cd7391 (diff) | |
download | qpid-python-1e437d92f66da4ef0dffbfb85e9e66e5b4f4f980.tar.gz |
Migrate broker to new direct method dispatch mechanism
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1631275 13f79535-47bb-0310-9956-ffa450edef68
67 files changed, 1774 insertions, 3343 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 27fa654843..4087b1f4a0 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -115,7 +115,7 @@ import org.apache.qpid.transport.TransportException; public class AMQChannel implements AMQSessionModel<AMQChannel, AMQProtocolEngine>, AsyncAutoCommitTransaction.FutureRecorder, - ChannelMethodProcessor + ServerChannelMethodProcessor { public static final int DEFAULT_PREFETCH = 4096; @@ -376,27 +376,18 @@ public class AMQChannel } public void publishContentHeader(ContentHeaderBody contentHeaderBody) - throws AMQException { - if (_currentMessage == null) + if (_logger.isDebugEnabled()) { - throw new AMQException("Received content header without previously receiving a BasicPublish frame"); + _logger.debug("Content header received on channel " + _channelId); } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content header received on channel " + _channelId); - } - _currentMessage.setContentHeaderBody(contentHeaderBody); + _currentMessage.setContentHeaderBody(contentHeaderBody); - deliverCurrentMessageIfComplete(); - } + deliverCurrentMessageIfComplete(); } private void deliverCurrentMessageIfComplete() - throws AMQException { // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) @@ -497,7 +488,7 @@ public class AMQChannel * @throws AMQConnectionException if the message is mandatory close-on-no-route * @see AMQProtocolEngine#isCloseWhenNoRoute() */ - private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException + private void handleUnroutableMessage(AMQMessage message) { boolean mandatory = message.isMandatory(); String description = currentMessageDescription(); @@ -512,26 +503,27 @@ public class AMQChannel if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute()) { - throw new AMQConnectionException( - AMQConstant.NO_ROUTE, - "No route for message " + currentMessageDescription(), - 0, 0, // default class and method ids - getConnection().getMethodRegistry(), - (Throwable) null); - } - - if (mandatory || message.isImmediate()) - { - _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message)); + _connection.closeConnection(AMQConstant.NO_ROUTE, + "No route for message " + currentMessageDescription(), _channelId); } else { - AMQShortString exchangeName = _currentMessage.getExchangeName(); - AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); + if (mandatory || message.isImmediate()) + { + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, + "No Route for message " + + currentMessageDescription(), + message)); + } + else + { + AMQShortString exchangeName = _currentMessage.getExchangeName(); + AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); - getVirtualHost().getEventLogger().message( - ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), - routingKey == null ? null : routingKey.asString())); + getVirtualHost().getEventLogger().message( + ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), + routingKey == null ? null : routingKey.asString())); + } } } @@ -550,13 +542,8 @@ public class AMQChannel : _currentMessage.getMessagePublishInfo().getRoutingKey().toString()); } - public void publishContentBody(ContentBody contentBody) throws AMQException + public void publishContentBody(ContentBody contentBody) { - if (_currentMessage == null) - { - throw new AMQException("Received content body without previously receiving a Content Header"); - } - if (_logger.isDebugEnabled()) { _logger.debug(debugIdentity() + " content body received on channel " + _channelId); @@ -568,13 +555,6 @@ public class AMQChannel deliverCurrentMessageIfComplete(); } - catch (AMQException e) - { - // we want to make sure we don't keep a reference to the message in the - // event of an error - _currentMessage = null; - throw e; - } catch (RuntimeException e) { // we want to make sure we don't keep a reference to the message in the @@ -1277,14 +1257,10 @@ public class AMQChannel private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle) - throws AMQException { AMQMessage message = new AMQMessage(handle, _connection.getReference()); - final BasicContentHeaderProperties properties = - incomingMessage.getContentHeader().getProperties(); - return message; } @@ -1340,6 +1316,11 @@ public class AMQChannel return _subject; } + public boolean hasCurrentMessage() + { + return _currentMessage != null; + } + private class GetDeliveryMethod implements ClientDeliveryMethod { @@ -2242,7 +2223,10 @@ public class AMQChannel } @Override - public void receiveChannelClose() + public void receiveChannelClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) { sync(); _connection.closeChannel(this); @@ -2258,6 +2242,43 @@ public class AMQChannel } @Override + public void receiveMessageContent(final byte[] data) + { + + if(hasCurrentMessage()) + { + publishContentBody(new ContentBody(data)); + } + else + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame", + _channelId); + } + } + + @Override + public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize) + { + if(hasCurrentMessage()) + { + publishContentHeader(new ContentHeaderBody(properties, bodySize)); + } + else + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame", + _channelId); + } + } + + @Override + public boolean ignoreAllButCloseOk() + { + return _connection.ignoreAllButCloseOk() || _connection.channelAwaitingClosure(_channelId); + } + + @Override public void receiveChannelFlow(final boolean active) { sync(); @@ -2270,9 +2291,15 @@ public class AMQChannel } @Override + public void receiveChannelFlowOk(final boolean active) + { + // TODO - should we do anything here? + } + + @Override public void receiveExchangeBound(final AMQShortString exchangeName, - final AMQShortString queueName, - final AMQShortString routingKey) + final AMQShortString routingKey, + final AMQShortString queueName) { VirtualHostImpl virtualHost = _connection.getVirtualHost(); MethodRegistry methodRegistry = _connection.getMethodRegistry(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index dce0c3128e..3a4e780db6 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.protocol.v0_8; import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -28,8 +31,6 @@ import java.security.AccessControlException; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -49,7 +50,6 @@ import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; -import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQDecoder; @@ -58,8 +58,6 @@ import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; @@ -93,8 +91,7 @@ import org.apache.qpid.util.BytesDataOutput; public class AMQProtocolEngine implements ServerProtocolEngine, AMQConnectionModel<AMQProtocolEngine, AMQChannel>, - AMQVersionAwareProtocolSession, - ConnectionMethodProcessor + ServerMethodProcessor<ServerChannelMethodProcessor> { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -114,9 +111,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private VirtualHostImpl<?,?,?> _virtualHost; private final Map<Integer, AMQChannel> _channelMap = - new HashMap<Integer, AMQChannel>(); + new HashMap<>(); private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = - new CopyOnWriteArrayList<SessionModelListener>(); + new CopyOnWriteArrayList<>(); private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; @@ -128,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, * Thread-safety: guarded by {@link #_receivedLock}. */ private final Set<AMQChannel> _channelsForCurrentMessage = - new HashSet<AMQChannel>(); + new HashSet<>(); private AMQDecoder _decoder; @@ -142,14 +139,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); - private final FrameCreatingMethodProcessor _methodProcessor = new FrameCreatingMethodProcessor(_protocolVersion); private final List<Action<? super AMQProtocolEngine>> _taskList = - new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>(); + new CopyOnWriteArrayList<>(); - private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); + private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>(); private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); - private MethodDispatcher _dispatcher; private final long _connectionID; private Object _reference = new Object(); @@ -183,6 +178,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private boolean _authenticated; private boolean _compressionSupported; private int _messageCompressionThreshold; + private int _currentClassId; + private int _currentMethodId; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -195,7 +192,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); - _decoder = new AMQDecoder(true, _methodProcessor); + _decoder = new BrokerDecoder(this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -306,32 +303,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _readBytes += msg.remaining(); _receivedLock.lock(); - List<AMQDataBlock> processedMethods = _methodProcessor.getProcessedMethods(); try { _decoder.decodeBuffer(msg); - for (AMQDataBlock dataBlock : processedMethods) - { - try - { - dataBlockReceived(dataBlock); - } - catch(AMQConnectionException e) - { - if(_logger.isDebugEnabled()) - { - _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e); - } - break; - } - catch (AMQException e) - { - _logger.error("Unexpected exception when processing datablock", e); - closeProtocolSession(); - break; - } - } - processedMethods.clear(); receivedComplete(); } catch (ConnectionScopedRuntimeException e) @@ -361,7 +335,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } finally { - processedMethods.clear(); _receivedLock.unlock(); } return null; @@ -399,112 +372,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - /** - * Process the data block. - * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}. - * - * @throws AMQConnectionException if unable to process the data block. In this case, - * the connection is already closed by the time the exception is thrown. If any other - * type of exception is thrown, the connection is not already closed. - */ - private void dataBlockReceived(AMQDataBlock message) throws AMQException - { - if (message instanceof ProtocolInitiation) - { - protocolInitiationReceived((ProtocolInitiation) message); - - } - else if (message instanceof AMQFrame) - { - AMQFrame frame = (AMQFrame) message; - frameReceived(frame); - } - else - { - throw new AMQException("Unknown message type: " + message.getClass().getName() + ": " + message); - } - } - - /** - * Handle the supplied frame. - * Adds this frame's channel to {@link #_channelsForCurrentMessage}. - * - * @throws AMQConnectionException if unable to process the data block. In this case, - * the connection is already closed by the time the exception is thrown. If any other - * type of exception is thrown, the connection is not already closed. - */ - private void frameReceived(AMQFrame frame) throws AMQException + void channelRequiresSync(final AMQChannel amqChannel) { - int channelId = frame.getChannel(); - AMQChannel amqChannel = _channelMap.get(channelId); - if(amqChannel != null) - { - // The _receivedLock is already acquired in the caller - // It is safe to add channel - _channelsForCurrentMessage.add(amqChannel); - } - else - { - // Not an error. The frame is probably a channel Open for this channel id, which - // does not require asynchronous work therefore its absence from - // _channelsForCurrentMessage is ok. - } - - AMQBody body = frame.getBodyFrame(); - - long startTime = 0; - String frameToString = null; - if (_logger.isDebugEnabled()) - { - startTime = System.currentTimeMillis(); - frameToString = frame.toString(); - _logger.debug("RECV: " + frame); - } - - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else - { - // The channel has been told to close, we don't process any more frames until - // it's closed. - return; - } - } - - try - { - body.handle(channelId, this); - } - catch(AMQConnectionException e) - { - _logger.info(e.getMessage() + " whilst processing frame: " + body); - closeConnection(channelId, e); - throw e; - } - catch (AMQException e) - { - closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); - throw e; - } - catch (TransportException e) - { - closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage()); - throw e; - } - - if(_logger.isDebugEnabled()) - { - _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString); - } + _channelsForCurrentMessage.add(amqChannel); } private synchronized void protocolInitiationReceived(ProtocolInitiation pi) @@ -623,148 +494,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return buf; } - public void methodFrameReceived(int channelId, AMQMethodBody methodBody) - { - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); - - try - { - try - { - boolean wasAnyoneInterested = methodReceived(evt); - - if (!wasAnyoneInterested) - { - throw new AMQNoMethodHandlerException(evt); - } - } - catch (AMQChannelException e) - { - if (getChannel(channelId) != null) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing channel due to: " + e.getMessage()); - } - - AMQConstant errorType = e.getErrorCode(); - if(errorType == null) - { - errorType = AMQConstant.INTERNAL_ERROR; - } - writeFrame(new AMQFrame(channelId, - getMethodRegistry().createChannelCloseBody(errorType.getCode(), - AMQShortString.validValueOf(e.getMessage()), - e.getClassId(), - e.getMethodId()))); - closeChannel(channelId, errorType, e.getMessage()); - } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("ChannelException occurred on non-existent channel:" + e.getMessage()); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e.getMessage()); - } - - AMQConnectionException ce = new AMQConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString(), - methodBody, getMethodRegistry()); - - _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce); - } - } - catch (AMQConnectionException e) - { - _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, e); - } - } - catch (Exception e) - { - _logger.error("Unexpected exception while processing frame. Closing connection.", e); - - closeProtocolSession(); - } - } - private <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException - { - final MethodDispatcher dispatcher = getMethodDispatcher(); - - final int channelId = evt.getChannelId(); - final B body = evt.getMethod(); - - final AMQChannel channel = getChannel(channelId); - if(channelId != 0 && channel == null) - { - - if(! ((body instanceof ChannelOpenBody) - || (body instanceof ChannelCloseOkBody) - || (body instanceof ChannelCloseBody))) - { - throw new AMQConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body, body, getMethodRegistry()); - } - - } - if(channel == null) - { - return body.execute(dispatcher, channelId); - } - else - { - try - { - return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>() - { - @Override - public Boolean run() throws AMQException - { - return body.execute(dispatcher, channelId); - } - }); - } - catch (PrivilegedActionException e) - { - if(e.getCause() instanceof AMQException) - { - throw (AMQException) e.getCause(); - } - else - { - throw new ServerScopedRuntimeException(e.getCause()); - } - } - - - } - - } - - public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException - { - - AMQChannel channel = getAndAssertChannel(channelId); - - channel.publishContentHeader(body); - - } - - public void contentBodyReceived(int channelId, ContentBody body) throws AMQException - { - AMQChannel channel = getAndAssertChannel(channelId); - - channel.publishContentBody(body); - } - - public void heartbeatBodyReceived(int channelId, HeartbeatBody body) - { - // NO - OP - } /** * Convenience method that writes a frame to the protocol session. Equivalent to calling @@ -808,19 +537,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { synchronized (_channelMap) { - return new ArrayList<AMQChannel>(_channelMap.values()); - } - } - - public AMQChannel getAndAssertChannel(int channelId) throws AMQException - { - AMQChannel channel = getChannel(channelId); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); + return new ArrayList<>(_channelMap.values()); } - - return channel; } public AMQChannel getChannel(int channelId) @@ -899,8 +617,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, writeFrame(new AMQFrame(channel.getChannelId(), getMethodRegistry().createChannelCloseBody(cause.getCode(), AMQShortString.validValueOf(message), - _methodProcessor.getClassId(), - _methodProcessor.getMethodId()))); + _currentClassId, + _currentMethodId))); closeChannel(channel, cause, message, true); } @@ -1106,7 +824,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { _logger.info("Closing connection due to: " + message); } - closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _methodProcessor.getClassId(), _methodProcessor.getMethodId()))); + closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId))); } private void closeConnection(int channelId, AMQFrame frame) @@ -1224,9 +942,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { _protocolVersion = pv; _methodRegistry.setProtocolVersion(_protocolVersion); - _methodProcessor.setProtocolVersion(_protocolVersion); _protocolOutputConverter = new ProtocolOutputConverterImpl(this); - _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this); } public byte getProtocolMajorVersion() @@ -1335,11 +1051,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _methodRegistry; } - public MethodDispatcher getMethodDispatcher() - { - return _dispatcher; - } - public void closed() { try @@ -1353,14 +1064,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, closeProtocolSession(); } } - catch (ConnectionScopedRuntimeException e) + catch (ConnectionScopedRuntimeException | TransportException e) { _logger.error("Could not close protocol engine", e); } - catch (TransportException e) - { - _logger.error("Could not close protocol engine", e); - } } public void readerIdle() @@ -1427,11 +1134,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - public void setSender(Sender<ByteBuffer> sender) - { - // Do nothing - } - public long getReadBytes() { return _readBytes; @@ -1572,7 +1274,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public List<AMQChannel> getSessionModels() { - return new ArrayList<AMQChannel>(getChannels()); + return new ArrayList<>(getChannels()); } public LogSubject getLogSubject() @@ -2074,4 +1776,52 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _broker.getEventLogger(); } } + + @Override + public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId) + { + ServerChannelMethodProcessor channelMethodProcessor = getChannel(channelId); + if(channelMethodProcessor == null) + { + channelMethodProcessor = (ServerChannelMethodProcessor) Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(), + new Class[] { ServerChannelMethodProcessor.class }, new InvocationHandler() + { + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) + throws Throwable + { + closeConnection(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId, channelId); + + return null; + } + }); + } + return channelMethodProcessor; + } + + @Override + public void receiveHeartbeat() + { + // No op + } + + @Override + public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) + { + protocolInitiationReceived(protocolInitiation); + } + + @Override + public void setCurrentMethod(final int classId, final int methodId) + { + _currentClassId = classId; + _currentMethodId = methodId; + } + + @Override + public boolean ignoreAllButCloseOk() + { + return _closing.get(); + } + } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java new file mode 100644 index 0000000000..5a4466b003 --- /dev/null +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import java.io.IOException; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; + +import javax.security.auth.Subject; + +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.codec.ServerDecoder; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.server.util.ServerScopedRuntimeException; + +public class BrokerDecoder extends ServerDecoder +{ + private final AMQProtocolEngine _connection; + /** + * Creates a new AMQP decoder. + * + * @param connection + */ + public BrokerDecoder(final AMQProtocolEngine connection) + { + super(connection); + _connection = connection; + } + + @Override + protected void processFrame(final int channelId, final byte type, final long bodySize, final MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + Subject subject; + AMQChannel channel = _connection.getChannel(channelId); + if(channel == null) + { + subject = _connection.getSubject(); + } + else + { + _connection.channelRequiresSync(channel); + + subject = channel.getSubject(); + } + try + { + Subject.doAs(subject, new PrivilegedExceptionAction<Object>() + { + @Override + public Void run() throws IOException, AMQFrameDecodingException + { + doProcessFrame(channelId, type, bodySize, in); + return null; + } + }); + } + catch (PrivilegedActionException e) + { + Throwable cause = e.getCause(); + if(cause instanceof IOException) + { + throw (IOException) cause; + } + else if(cause instanceof AMQFrameDecodingException) + { + throw (AMQFrameDecodingException) cause; + } + else if(cause instanceof RuntimeException) + { + throw (RuntimeException) cause; + } + else throw new ServerScopedRuntimeException(cause); + } + + } + + + private void doProcessFrame(final int channelId, final byte type, final long bodySize, final MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + super.processFrame(channelId, type, bodySize, in); + + } + +} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java index 821d6101db..d966e9c9c6 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java @@ -20,16 +20,15 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; +import java.util.ArrayList; +import java.util.List; + import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.message.MessageDestination; -import java.util.ArrayList; -import java.util.List; - public class IncomingMessage { @@ -58,7 +57,7 @@ public class IncomingMessage return _messagePublishInfo; } - public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException + public void addContentBodyFrame(final ContentBody contentChunk) { _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); @@ -94,7 +93,7 @@ public class IncomingMessage _messageDestination = e; } - public int getBodyCount() throws AMQException + public int getBodyCount() { return _contentChunks.size(); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java deleted file mode 100644 index b3ee5f9ff9..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java +++ /dev/null @@ -1,826 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import java.security.PrivilegedAction; - -import javax.security.auth.Subject; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.Broker; - -public class ServerMethodDispatcherImpl implements MethodDispatcher -{ - private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class); - - private final AMQProtocolEngine _connection; - - - private static interface ChannelAction - { - void onChannel(ChannelMethodProcessor channel); - } - - - private static interface ConnectionAction - { - void onConnection(ConnectionMethodProcessor connection); - } - - - public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection) - { - return new ServerMethodDispatcherImpl(connection); - } - - - public ServerMethodDispatcherImpl(AMQProtocolEngine connection) - { - _connection = connection; - } - - - protected final AMQProtocolEngine getConnection() - { - return _connection; - } - - private void processChannelMethod(int channelId, final ChannelAction action) - { - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - closeConnection(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId); - } - else - { - Subject.doAs(channel.getSubject(), new PrivilegedAction<Void>() - { - @Override - public Void run() - { - action.onChannel(channel); - return null; - } - }); - } - - } - - private void processConnectionMethod(final ConnectionAction action) - { - Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>() - { - @Override - public Void run() - { - action.onConnection(_connection); - return null; - } - }); - - - } - - public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveAccessRequest(body.getRealm(), - body.getExclusive(), - body.getPassive(), - body.getActive(), - body.getWrite(), - body.getRead()); - } - } - ); - - return true; - } - - public boolean dispatchBasicAck(final BasicAckBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicAck(body.getDeliveryTag(), body.getMultiple()); - } - } - ); - - return true; - } - - public boolean dispatchBasicCancel(final BasicCancelBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicCancel(body.getConsumerTag(), - body.getNowait() - ); - } - } - ); - return true; - } - - public boolean dispatchBasicConsume(final BasicConsumeBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicConsume(body.getQueue(), body.getConsumerTag(), - body.getNoLocal(), body.getNoAck(), - body.getExclusive(), body.getNowait(), - body.getArguments()); - } - } - ); - - - return true; - } - - private void closeConnection(final AMQConstant constant, - final String message) - { - _connection.closeConnection(constant, message, 0); - } - - public boolean dispatchBasicGet(final BasicGetBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicGet(body.getQueue(), body.getNoAck()); - } - } - ); - return true; - } - - public boolean dispatchBasicPublish(final BasicPublishBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicPublish(body.getExchange(), body.getRoutingKey(), - body.getMandatory(), body.getImmediate()); - } - } - ); - - return true; - } - - public boolean dispatchBasicQos(final BasicQosBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicQos(body.getPrefetchSize(), body.getPrefetchCount(), - body.getGlobal()); - } - } - ); - - return true; - } - - public boolean dispatchBasicRecover(final BasicRecoverBody body, int channelId) - { - final boolean sync = _connection.getProtocolVersion().equals(ProtocolVersion.v8_0); - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicRecover(body.getRequeue(), sync); - } - } - ); - - return true; - } - - public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicReject(body.getDeliveryTag(), body.getRequeue()); - } - } - ); - - return true; - } - - public boolean dispatchChannelOpen(ChannelOpenBody body, final int channelId) - { - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveChannelOpen(channelId); - } - }); - return true; - } - - - public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveChannelClose(); - } - } - ); - - return true; - } - - - public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveChannelCloseOk(); - } - } - ); - - return true; - } - - - public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveChannelFlow(body.getActive()); - } - } - ); - return true; - } - - public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchConnectionOpen(final ConnectionOpenBody body, int channelId) - { - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionOpen(body.getVirtualHost(), body.getCapabilities(), body.getInsist()); - } - }); - - return true; - } - - - public boolean dispatchConnectionClose(final ConnectionCloseBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionClose(body.getReplyCode(), - body.getReplyText(), - body.getClassId(), - body.getMethodId()); - } - }); - - return true; - } - - - public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionCloseOk(); - } - }); - - return true; - } - - public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchConnectionSecureOk(final ConnectionSecureOkBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionSecureOk(body.getResponse()); - } - }); - - return true; - } - - private void disposeSaslServer(AMQProtocolEngine connection) - { - SaslServer ss = connection.getSaslServer(); - if (ss != null) - { - connection.setSaslServer(null); - try - { - ss.dispose(); - } - catch (SaslException e) - { - _logger.error("Error disposing of Sasl server: " + e); - } - } - } - - public boolean dispatchConnectionStartOk(final ConnectionStartOkBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionStartOk(body.getClientProperties(), - body.getMechanism(), - body.getResponse(), - body.getLocale()); - } - }); - - return true; - } - - public boolean dispatchConnectionTuneOk(final ConnectionTuneOkBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionTuneOk(body.getChannelMax(), - body.getFrameMax(), - body.getHeartbeat()); - } - }); - final AMQProtocolEngine connection = getConnection(); - - - return true; - } - - public boolean dispatchExchangeBound(final ExchangeBoundBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveExchangeBound(body.getExchange(), body.getQueue(), body.getRoutingKey()); - } - } - ); - - return true; - } - - public boolean dispatchExchangeDeclare(final ExchangeDeclareBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveExchangeDeclare(body.getExchange(), body.getType(), - body.getPassive(), - body.getDurable(), - body.getAutoDelete(), - body.getInternal(), - body.getNowait(), - body.getArguments()); - } - } - ); - - return true; - } - - public boolean dispatchExchangeDelete(final ExchangeDeleteBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveExchangeDelete(body.getExchange(), - body.getIfUnused(), - body.getNowait()); - } - } - ); - - return true; - } - - public boolean dispatchQueueBind(final QueueBindBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueueBind(body.getQueue(), - body.getExchange(), - body.getRoutingKey(), - body.getNowait(), - body.getArguments()); - } - } - ); - - return true; - } - - public boolean dispatchQueueDeclare(final QueueDeclareBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueueDeclare(body.getQueue(), - body.getPassive(), - body.getDurable(), - body.getExclusive(), - body.getAutoDelete(), - body.getNowait(), - body.getArguments()); - } - } - ); - - return true; - } - - public boolean dispatchQueueDelete(final QueueDeleteBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueueDelete(body.getQueue(), - body.getIfUnused(), - body.getIfEmpty(), - body.getNowait()); - } - } - ); - - return true; - } - - public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueuePurge(body.getQueue(), - body.getNowait()); - } - } - ); - - return true; - } - - - public boolean dispatchTxCommit(TxCommitBody body, final int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveTxCommit(); - } - } - ); - - return true; - } - - public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveTxRollback(); - } - } - ); - return true; - } - - public boolean dispatchTxSelect(TxSelectBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveTxSelect(); - } - } - ); - return true; - } - - public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicRecover(body.getRequeue(), true); - } - } - ); - - return true; - } - - public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - @Override - public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId) - throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueueUnbind(body.getQueue(), - body.getExchange(), - body.getRoutingKey(), - body.getArguments()); - } - } - ); - - return true; - } - -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java deleted file mode 100644 index 625836bcf2..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java +++ /dev/null @@ -1,964 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import java.security.PrivilegedAction; - -import javax.security.auth.Subject; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.log4j.Logger; - -import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; - -public class ServerMethodProcessor implements MethodProcessor -{ - private static final Logger LOGGER = Logger.getLogger(ServerMethodProcessor.class); - private int _classId; - private int _methodId; - - - private static interface ChannelAction - { - void onChannel(ChannelMethodProcessor channel); - } - - private ProtocolVersion _protocolVersion; - private ServerMethodDispatcherImpl _dispatcher; - private AMQProtocolEngine _connection; - - public ServerMethodProcessor(final ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - } - - - private void processChannelMethod(int channelId, final ChannelAction action) - { - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - // TODO throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - else - { - Subject.doAs(channel.getSubject(), new PrivilegedAction<Void>() - { - @Override - public Void run() - { - action.onChannel(channel); - return null; - } - }); - } - - } - - @Override - public void receiveConnectionStart(final short versionMajor, - final short versionMinor, - final FieldTable serverProperties, - final byte[] mechanisms, - final byte[] locales) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, - new ConnectionStartBody(versionMajor, - versionMinor, - serverProperties, - mechanisms, - locales)); - } - _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unexpected method received: ConnectionStart", 0 - ); - - } - - @Override - public void receiveConnectionStartOk(final FieldTable clientProperties, - final AMQShortString mechanism, - final byte[] response, - final AMQShortString locale) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale)); - } - - Broker<?> broker = _connection.getBroker(); - - SubjectCreator subjectCreator = _connection.getSubjectCreator(); - SaslServer ss = null; - try - { - ss = subjectCreator.createSaslServer(String.valueOf(mechanism), - _connection.getLocalFQDN(), - _connection.getPeerPrincipal()); - - if (ss == null) - { - _connection.closeConnection(AMQConstant.RESOURCE_ERROR, - "Unable to create SASL Server:" + mechanism, 0 - ); - } - else - { - _connection.setSaslServer(ss); - - final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response); - //save clientProperties - _connection.setClientProperties(clientProperties); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); - - LOGGER.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - _connection.closeConnection(AMQConstant.NOT_ALLOWED, - AMQConstant.NOT_ALLOWED.getName().toString(), 0 - ); - - disposeSaslServer(); - break; - - case SUCCESS: - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Connected as: " + authResult.getSubject()); - } - _connection.setAuthorizedSubject(authResult.getSubject()); - - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if (frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } - - ConnectionTuneBody - tuneBody = - methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - break; - case CONTINUE: - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); - } - } - } - catch (SaslException e) - { - disposeSaslServer(); - - _connection.closeConnection(AMQConstant.RESOURCE_ERROR, "SASL error: " + e.getMessage(), 0 - ); - } - - } - - @Override - public void receiveTxSelect(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxSelectBody.INSTANCE); - } - - } - - @Override - public void receiveTxSelectOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxSelectOkBody.INSTANCE); - } - - } - - @Override - public void receiveTxCommit(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxCommitBody.INSTANCE); - } - - } - - @Override - public void receiveTxCommitOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxCommitOkBody.INSTANCE); - } - - } - - @Override - public void receiveTxRollback(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxRollbackBody.INSTANCE); - } - - } - - @Override - public void receiveTxRollbackOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxRollbackOkBody.INSTANCE); - } - - } - - @Override - public void receiveConnectionSecure(final byte[] challenge) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionSecureBody(challenge)); - } - - } - - @Override - public void receiveConnectionSecureOk(final byte[] response) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionSecureOkBody(response)); - } - - } - - @Override - public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat)); - } - - } - - @Override - public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat)); - } - - } - - @Override - public void receiveConnectionOpen(final AMQShortString virtualHost, - final AMQShortString capabilities, - final boolean insist) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist)); - } - - } - - @Override - public void receiveConnectionOpenOk(final AMQShortString knownHosts) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionOpenOkBody(knownHosts)); - } - - } - - @Override - public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts)); - } - - } - - @Override - public void receiveConnectionClose(final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, - new ConnectionCloseBody(getProtocolVersion(), - replyCode, - replyText, - classId, - methodId)); - } - - } - - @Override - public void receiveConnectionCloseOk() - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion()) - ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8 - : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9); - } - } - - @Override - public void receiveChannelOpen(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelOpenBody()); - } - - } - - @Override - public void receiveChannelOpenOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) - ? ChannelOpenOkBody.INSTANCE_0_8 - : ChannelOpenOkBody.INSTANCE_0_9); - } - } - - @Override - public void receiveChannelFlow(final int channelId, final boolean active) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelFlowBody(active)); - } - - } - - @Override - public void receiveChannelFlowOk(final int channelId, final boolean active) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelFlowOkBody(active)); - } - - } - - @Override - public void receiveChannelAlert(final int channelId, - final int replyCode, - final AMQShortString replyText, - final FieldTable details) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)); - } - - } - - @Override - public void receiveChannelClose(final int channelId, - final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)); - } - - } - - @Override - public void receiveChannelCloseOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE); - } - - } - - @Override - public void receiveAccessRequest(final int channelId, - final AMQShortString realm, - final boolean exclusive, - final boolean passive, - final boolean active, - final boolean write, - final boolean read) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = - new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)); - } - - } - - @Override - public void receiveAccessRequestOk(final int channelId, final int ticket) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new AccessRequestOkBody(ticket)); - } - - } - - @Override - public void receiveExchangeDeclare(final int channelId, - final AMQShortString exchange, - final AMQShortString type, - final boolean passive, - final boolean durable, - final boolean autoDelete, - final boolean internal, - final boolean nowait, final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new ExchangeDeclareBody(0, - exchange, - type, - passive, - durable, - autoDelete, - internal, - nowait, - arguments)); - } - - } - - @Override - public void receiveExchangeDeclareOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeDeclareOkBody()); - } - - } - - @Override - public void receiveExchangeDelete(final int channelId, - final AMQShortString exchange, - final boolean ifUnused, - final boolean nowait) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)); - } - - } - - @Override - public void receiveExchangeDeleteOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeDeleteOkBody()); - } - - } - - @Override - public void receiveExchangeBound(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final AMQShortString queue) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)); - } - - } - - @Override - public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)); - } - - } - - @Override - public void receiveQueueBindOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueBindOkBody()); - } - - } - - @Override - public void receiveQueueUnbindOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueUnbindOkBody()); - } - - } - - @Override - public void receiveQueueDeclare(final int channelId, - final AMQShortString queue, - final boolean passive, - final boolean durable, - final boolean exclusive, - final boolean autoDelete, - final boolean nowait, - final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new QueueDeclareBody(0, - queue, - passive, - durable, - exclusive, - autoDelete, - nowait, - arguments)); - } - - } - - @Override - public void receiveQueueDeclareOk(final int channelId, - final AMQShortString queue, - final long messageCount, - final long consumerCount) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)); - } - - } - - @Override - public void receiveQueueBind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final boolean nowait, - final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = - new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)); - } - - } - - @Override - public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)); - } - - } - - @Override - public void receiveQueuePurgeOk(final int channelId, final long messageCount) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)); - } - - } - - @Override - public void receiveQueueDelete(final int channelId, - final AMQShortString queue, - final boolean ifUnused, - final boolean ifEmpty, - final boolean nowait) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)); - } - - } - - @Override - public void receiveQueueDeleteOk(final int channelId, final long messageCount) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)); - } - - } - - @Override - public void receiveQueueUnbind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)); - } - - } - - @Override - public void receiveBasicRecoverSyncOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())); - } - - } - - @Override - public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync) - { - if (ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicRecoverBody(requeue)); - } - - } - else - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)); - } - - } - } - - @Override - public void receiveBasicQos(final int channelId, - final long prefetchSize, - final int prefetchCount, - final boolean global) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)); - } - - } - - @Override - public void receiveBasicQosOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicQosOkBody()); - } - - } - - @Override - public void receiveBasicConsume(final int channelId, - final AMQShortString queue, - final AMQShortString consumerTag, - final boolean noLocal, - final boolean noAck, - final boolean exclusive, - final boolean nowait, - final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new BasicConsumeBody(0, - queue, - consumerTag, - noLocal, - noAck, - exclusive, - nowait, - arguments)); - } - - } - - @Override - public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)); - } - - } - - @Override - public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)); - } - - } - - @Override - public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)); - } - - } - - @Override - public void receiveBasicPublish(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final boolean mandatory, - final boolean immediate) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = - new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)); - } - - } - - @Override - public void receiveBasicReturn(final int channelId, final int replyCode, - final AMQShortString replyText, - final AMQShortString exchange, - final AMQShortString routingKey) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)); - } - - } - - @Override - public void receiveBasicDeliver(final int channelId, - final AMQShortString consumerTag, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new BasicDeliverBody(consumerTag, - deliveryTag, - redelivered, - exchange, - routingKey)); - } - - } - - @Override - public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)); - } - - } - - @Override - public void receiveBasicGetOk(final int channelId, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey, - final long messageCount) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new BasicGetOkBody(deliveryTag, - redelivered, - exchange, - routingKey, - messageCount)); - } - - } - - @Override - public void receiveBasicGetEmpty(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString) null)); - } - - } - - @Override - public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)); - } - - } - - @Override - public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)); - } - - } - - @Override - public void receiveHeartbeat() - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new HeartbeatBody()); - } - - } - - @Override - public ProtocolVersion getProtocolVersion() - { - return _protocolVersion; - } - - public void setProtocolVersion(final ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - } - - @Override - public void receiveMessageContent(final int channelId, final byte[] data) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ContentBody(data)); - } - - } - - @Override - public void receiveMessageHeader(final int channelId, - final BasicContentHeaderProperties properties, - final long bodySize) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)); - } - - } - - @Override - public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) - { - if (LOGGER.isDebugEnabled()) - { - AMQDataBlock frame = protocolInitiation; - } - - } - - @Override - public void setCurrentMethod(final int classId, final int methodId) - { - _classId = classId; - _methodId = methodId; - } - - private void disposeSaslServer() - { - SaslServer ss = _connection.getSaslServer(); - if (ss != null) - { - _connection.setSaslServer(null); - try - { - ss.dispose(); - } - catch (SaslException e) - { - LOGGER.error("Error disposing of Sasl server: " + e); - } - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 695b7c3253..bb98c0abbd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -48,6 +48,7 @@ import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.ClientDecoder; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -193,7 +194,7 @@ public class AMQProtocolHandler implements ProtocolEngine _connection = con; _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); - _decoder = new AMQDecoder(false, _protocolSession.getMethodProcessor()); + _decoder = new ClientDecoder(_protocolSession.getMethodProcessor()); _failoverHandler = new FailoverHandler(this); } diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index b7904303b5..9d98168687 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -30,14 +30,8 @@ import java.util.ArrayList; import java.util.List; import java.util.ListIterator; -import org.apache.qpid.framing.AMQDataBlockDecoder; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.AMQProtocolVersionException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ByteArrayDataInput; -import org.apache.qpid.framing.EncodingUtils; -import org.apache.qpid.framing.MethodProcessor; -import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQConstant; /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a @@ -51,12 +45,9 @@ import org.apache.qpid.framing.ProtocolInitiation; * TODO If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the * per-session overhead. */ -public class AMQDecoder +public abstract class AMQDecoder<T extends MethodProcessor> { - private final MethodProcessor _methodProcessor; - - /** Holds the 'normal' AMQP data decoder. */ - private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); + private final T _methodProcessor; /** Holds the protocol initiation decoder. */ private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder(); @@ -67,6 +58,8 @@ public class AMQDecoder private boolean _firstRead = true; + private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); + private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>(); /** @@ -75,7 +68,7 @@ public class AMQDecoder * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation. * @param methodProcessor method processor */ - public AMQDecoder(boolean expectProtocolInitiation, MethodProcessor methodProcessor) + protected AMQDecoder(boolean expectProtocolInitiation, T methodProcessor) { _expectProtocolInitiation = expectProtocolInitiation; _methodProcessor = methodProcessor; @@ -96,7 +89,12 @@ public class AMQDecoder public void setMaxFrameSize(final int frameMax) { - _dataBlockDecoder.setMaxFrameSize(frameMax); + _maxFrameSize = frameMax; + } + + public T getMethodProcessor() + { + return _methodProcessor; } private class RemainingByteArrayInputStream extends InputStream @@ -254,10 +252,10 @@ public class AMQDecoder { if(!_expectProtocolInitiation) { - enoughData = _dataBlockDecoder.decodable(msg); + enoughData = decodable(msg); if (enoughData) { - _dataBlockDecoder.processInput(_methodProcessor, msg); + processInput(msg); } } else @@ -303,4 +301,105 @@ public class AMQDecoder } } } + + private boolean decodable(final MarkableDataInput in) throws AMQFrameDecodingException, IOException + { + final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1); + // type, channel, body length and end byte + if (remainingAfterAttributes < 0) + { + return false; + } + + in.mark(8); + in.skip(1 + 2); + + + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.readInt() & 0xffffffffL; + if (bodySize > _maxFrameSize) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Incoming frame size of " + + bodySize + + " is larger than negotiated maximum of " + + _maxFrameSize); + } + in.reset(); + + return (remainingAfterAttributes >= bodySize); + + } + + private void processInput(final MarkableDataInput in) + throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + { + final byte type = in.readByte(); + + final int channel = in.readUnsignedShort(); + final long bodySize = EncodingUtils.readUnsignedInteger(in); + + // bodySize can be zero + if ((channel < 0) || (bodySize < 0)) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); + } + + processFrame(channel, type, bodySize, in); + + byte marker = in.readByte(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, + "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type); + } + + } + + protected void processFrame(final int channel, final byte type, final long bodySize, final MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + switch (type) + { + case 1: + processMethod(channel, in); + break; + case 2: + ContentHeaderBody.process(in, _methodProcessor.getChannelMethodProcessor(channel), bodySize); + break; + case 3: + ContentBody.process(in, _methodProcessor.getChannelMethodProcessor(channel), bodySize); + break; + case 8: + HeartbeatBody.process(channel, in, _methodProcessor, bodySize); + break; + default: + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); + } + } + + + abstract void processMethod(int channelId, + MarkableDataInput in) + throws AMQFrameDecodingException, IOException; + + AMQFrameDecodingException newUnknownMethodException(final int classId, + final int methodId, + ProtocolVersion protocolVersion) + { + return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID, + "Method " + + methodId + + " unknown in AMQP version " + + protocolVersion + + " (while trying to decode class " + + classId + + " method " + + methodId + + "."); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java new file mode 100644 index 0000000000..5048193cac --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.codec; + +import java.io.IOException; + +import org.apache.qpid.framing.*; + +public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>> +{ + + /** + * Creates a new AMQP decoder. + * + * @param methodProcessor method processor + */ + public ClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor) + { + super(false, methodProcessor); + } + + + void processMethod(int channelId, + MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor = getMethodProcessor(); + ClientChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId); + final int classAndMethod = in.readInt(); + int classId = classAndMethod >> 16; + int methodId = classAndMethod & 0xFFFF; + methodProcessor.setCurrentMethod(classId, methodId); + try + { + switch (classAndMethod) + { + //CONNECTION_CLASS: + case 0x000a000a: + ConnectionStartBody.process(in, methodProcessor); + break; + case 0x000a0014: + ConnectionSecureBody.process(in, methodProcessor); + break; + case 0x000a001e: + ConnectionTuneBody.process(in, methodProcessor); + break; + case 0x000a0029: + ConnectionOpenOkBody.process(in, methodProcessor); + break; + case 0x000a002a: + ConnectionRedirectBody.process(in, methodProcessor); + break; + case 0x000a0032: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionRedirectBody.process(in, methodProcessor); + } + else + { + ConnectionCloseBody.process(in, methodProcessor); + } + break; + case 0x000a0033: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + else + { + methodProcessor.receiveConnectionCloseOk(); + } + break; + case 0x000a003c: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionCloseBody.process(in, methodProcessor); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + case 0x000a003d: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + methodProcessor.receiveConnectionCloseOk(); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + + // CHANNEL_CLASS: + + case 0x0014000b: + ChannelOpenOkBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor); + break; + case 0x00140014: + ChannelFlowBody.process(in, channelMethodProcessor); + break; + case 0x00140015: + ChannelFlowOkBody.process(in, channelMethodProcessor); + break; + case 0x0014001e: + ChannelAlertBody.process(in, channelMethodProcessor); + break; + case 0x00140028: + ChannelCloseBody.process(in, channelMethodProcessor); + break; + case 0x00140029: + channelMethodProcessor.receiveChannelCloseOk(); + break; + + // ACCESS_CLASS: + + case 0x001e000b: + AccessRequestOkBody.process(in, channelMethodProcessor); + break; + + // EXCHANGE_CLASS: + + case 0x0028000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveExchangeDeclareOk(); + } + break; + case 0x00280015: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveExchangeDeleteOk(); + } + break; + case 0x00280017: + ExchangeBoundOkBody.process(in, channelMethodProcessor); + break; + + + // QUEUE_CLASS: + + case 0x0032000b: + QueueDeclareOkBody.process(in, channelMethodProcessor); + break; + case 0x00320015: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveQueueBindOk(); + } + break; + case 0x0032001f: + QueuePurgeOkBody.process(in, channelMethodProcessor); + break; + case 0x00320029: + QueueDeleteOkBody.process(in, channelMethodProcessor); + break; + case 0x00320033: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveQueueUnbindOk(); + } + break; + + + // BASIC_CLASS: + + case 0x003c000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveBasicQosOk(); + } + break; + case 0x003c0015: + BasicConsumeOkBody.process(in, channelMethodProcessor); + break; + case 0x003c001f: + BasicCancelOkBody.process(in, channelMethodProcessor); + break; + case 0x003c0032: + BasicReturnBody.process(in, channelMethodProcessor); + break; + case 0x003c003c: + BasicDeliverBody.process(in, channelMethodProcessor); + break; + case 0x003c0047: + BasicGetOkBody.process(in, channelMethodProcessor); + break; + case 0x003c0048: + BasicGetEmptyBody.process(in, channelMethodProcessor); + break; + case 0x003c0065: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveBasicRecoverSyncOk(); + } + break; + case 0x003c006f: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveBasicRecoverSyncOk(); + } + break; + + // TX_CLASS: + + case 0x005a000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxSelectOk(); + } + break; + case 0x005a0015: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxCommitOk(); + } + break; + case 0x005a001f: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxRollbackOk(); + } + break; + + default: + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + + } + } + finally + { + methodProcessor.setCurrentMethod(0, 0); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java new file mode 100644 index 0000000000..3b138ba278 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java @@ -0,0 +1,234 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.codec; + +import java.io.IOException; + +import org.apache.qpid.framing.*; + +public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends ServerChannelMethodProcessor>> +{ + + /** + * Creates a new AMQP decoder. + * + * @param methodProcessor method processor + */ + public ServerDecoder(final ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor) + { + super(true, methodProcessor); + } + + void processMethod(int channelId, + MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor = getMethodProcessor(); + ServerChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId); + final int classAndMethod = in.readInt(); + int classId = classAndMethod >> 16; + int methodId = classAndMethod & 0xFFFF; + methodProcessor.setCurrentMethod(classId, methodId); + try + { + switch (classAndMethod) + { + //CONNECTION_CLASS: + case 0x000a000b: + ConnectionStartOkBody.process(in, methodProcessor); + break; + case 0x000a0015: + ConnectionSecureOkBody.process(in, methodProcessor); + break; + case 0x000a001f: + ConnectionTuneOkBody.process(in, methodProcessor); + break; + case 0x000a0028: + ConnectionOpenBody.process(in, methodProcessor); + break; + case 0x000a0032: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + else + { + ConnectionCloseBody.process(in, methodProcessor); + } + break; + case 0x000a0033: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + else + { + methodProcessor.receiveConnectionCloseOk(); + } + break; + case 0x000a003c: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + ConnectionCloseBody.process(in, methodProcessor); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + case 0x000a003d: + if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0)) + { + methodProcessor.receiveConnectionCloseOk(); + } + else + { + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + } + break; + + // CHANNEL_CLASS: + + case 0x0014000a: + ChannelOpenBody.process(channelId, in, methodProcessor); + break; + case 0x00140014: + ChannelFlowBody.process(in, channelMethodProcessor); + break; + case 0x00140015: + ChannelFlowOkBody.process(in, channelMethodProcessor); + break; + case 0x00140028: + ChannelCloseBody.process(in, channelMethodProcessor); + break; + case 0x00140029: + channelMethodProcessor.receiveChannelCloseOk(); + break; + + // ACCESS_CLASS: + + case 0x001e000a: + AccessRequestBody.process(in, channelMethodProcessor); + break; + + // EXCHANGE_CLASS: + + case 0x0028000a: + ExchangeDeclareBody.process(in, channelMethodProcessor); + break; + case 0x00280014: + ExchangeDeleteBody.process(in, channelMethodProcessor); + break; + case 0x00280016: + ExchangeBoundBody.process(in, channelMethodProcessor); + break; + + + // QUEUE_CLASS: + + case 0x0032000a: + QueueDeclareBody.process(in, channelMethodProcessor); + break; + case 0x00320014: + QueueBindBody.process(in, channelMethodProcessor); + break; + case 0x0032001e: + QueuePurgeBody.process(in, channelMethodProcessor); + break; + case 0x00320028: + QueueDeleteBody.process(in, channelMethodProcessor); + break; + case 0x00320032: + QueueUnbindBody.process(in, channelMethodProcessor); + break; + + + // BASIC_CLASS: + + case 0x003c000a: + BasicQosBody.process(in, channelMethodProcessor); + break; + case 0x003c0014: + BasicConsumeBody.process(in, channelMethodProcessor); + break; + case 0x003c001e: + BasicCancelBody.process(in, channelMethodProcessor); + break; + case 0x003c0028: + BasicPublishBody.process(in, channelMethodProcessor); + break; + case 0x003c0046: + BasicGetBody.process(in, channelMethodProcessor); + break; + case 0x003c0050: + BasicAckBody.process(in, channelMethodProcessor); + break; + case 0x003c005a: + BasicRejectBody.process(in, channelMethodProcessor); + break; + case 0x003c0064: + BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor); + break; + case 0x003c0066: + BasicRecoverSyncBody.process(in, channelMethodProcessor); + break; + case 0x003c006e: + BasicRecoverSyncBody.process(in, channelMethodProcessor); + break; + + // TX_CLASS: + + case 0x005a000a: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxSelect(); + } + break; + case 0x005a0014: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxCommit(); + } + break; + case 0x005a001e: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveTxRollback(); + } + break; + + default: + throw newUnknownMethodException(classId, methodId, + methodProcessor.getProtocolVersion()); + + } + } + finally + { + methodProcessor.setCurrentMethod(0, 0); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java deleted file mode 100644 index 49a88b6bc1..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.codec.MarkableDataInput; -import org.apache.qpid.protocol.AMQConstant; - -public class AMQDataBlockDecoder -{ - - private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); - private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); - - public AMQDataBlockDecoder() - { - } - - public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException - { - final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1); - // type, channel, body length and end byte - if (remainingAfterAttributes < 0) - { - return false; - } - - in.mark(8); - in.skip(1 + 2); - - - // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() - final long bodySize = in.readInt() & 0xffffffffL; - if (bodySize > _maxFrameSize) - { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, - "Incoming frame size of " - + bodySize - + " is larger than negotiated maximum of " - + _maxFrameSize); - } - in.reset(); - - return (remainingAfterAttributes >= bodySize); - - } - - public void processInput(MethodProcessor processor, - MarkableDataInput in) - throws AMQFrameDecodingException, AMQProtocolVersionException, IOException - { - final byte type = in.readByte(); - - final int channel = in.readUnsignedShort(); - final long bodySize = EncodingUtils.readUnsignedInteger(in); - - // bodySize can be zero - if ((channel < 0) || (bodySize < 0)) - { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, - "Undecodable frame: type = " + type + " channel = " + channel - + " bodySize = " + bodySize); - } - - switch (type) - { - case 1: - processMethod(channel, in, processor); - break; - case 2: - ContentHeaderBody.process(channel, in, processor, bodySize); - break; - case 3: - ContentBody.process(channel, in, processor, bodySize); - break; - case 8: - HeartbeatBody.process(channel, in, processor, bodySize); - break; - default: - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); - } - - byte marker = in.readByte(); - if ((marker & 0xFF) != 0xCE) - { - throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, - "End of frame marker not found. Read " + marker + " length=" + bodySize - + " type=" + type); - } - - } - - public void setMaxFrameSize(final int maxFrameSize) - { - _maxFrameSize = maxFrameSize; - } - - private void processMethod(int channelId, - MarkableDataInput in, - MethodProcessor dispatcher) - throws AMQFrameDecodingException, IOException - { - final int classAndMethod = in.readInt(); - int classId = classAndMethod >> 16; - int methodId = classAndMethod & 0xFFFF; - dispatcher.setCurrentMethod(classId, methodId); - try - { - switch (classAndMethod) - { - //CONNECTION_CLASS: - case 0x000a000a: - ConnectionStartBody.process(in, dispatcher); - break; - case 0x000a000b: - ConnectionStartOkBody.process(in, dispatcher); - break; - case 0x000a0014: - ConnectionSecureBody.process(in, dispatcher); - break; - case 0x000a0015: - ConnectionSecureOkBody.process(in, dispatcher); - break; - case 0x000a001e: - ConnectionTuneBody.process(in, dispatcher); - break; - case 0x000a001f: - ConnectionTuneOkBody.process(in, dispatcher); - break; - case 0x000a0028: - ConnectionOpenBody.process(in, dispatcher); - break; - case 0x000a0029: - ConnectionOpenOkBody.process(in, dispatcher); - break; - case 0x000a002a: - ConnectionRedirectBody.process(in, dispatcher); - break; - case 0x000a0032: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - ConnectionRedirectBody.process(in, dispatcher); - } - else - { - ConnectionCloseBody.process(in, dispatcher); - } - break; - case 0x000a0033: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - throw newUnknownMethodException(classId, methodId, - dispatcher.getProtocolVersion()); - } - else - { - dispatcher.receiveConnectionCloseOk(); - } - break; - case 0x000a003c: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - ConnectionCloseBody.process(in, dispatcher); - } - else - { - throw newUnknownMethodException(classId, methodId, - dispatcher.getProtocolVersion()); - } - break; - case 0x000a003d: - if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - dispatcher.receiveConnectionCloseOk(); - } - else - { - throw newUnknownMethodException(classId, methodId, - dispatcher.getProtocolVersion()); - } - break; - - // CHANNEL_CLASS: - - case 0x0014000a: - ChannelOpenBody.process(channelId, in, dispatcher); - break; - case 0x0014000b: - ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); - break; - case 0x00140014: - ChannelFlowBody.process(channelId, in, dispatcher); - break; - case 0x00140015: - ChannelFlowOkBody.process(channelId, in, dispatcher); - break; - case 0x0014001e: - ChannelAlertBody.process(channelId, in, dispatcher); - break; - case 0x00140028: - ChannelCloseBody.process(channelId, in, dispatcher); - break; - case 0x00140029: - dispatcher.receiveChannelCloseOk(channelId); - break; - - // ACCESS_CLASS: - - case 0x001e000a: - AccessRequestBody.process(channelId, in, dispatcher); - break; - case 0x001e000b: - AccessRequestOkBody.process(channelId, in, dispatcher); - break; - - // EXCHANGE_CLASS: - - case 0x0028000a: - ExchangeDeclareBody.process(channelId, in, dispatcher); - break; - case 0x0028000b: - dispatcher.receiveExchangeDeclareOk(channelId); - break; - case 0x00280014: - ExchangeDeleteBody.process(channelId, in, dispatcher); - break; - case 0x00280015: - dispatcher.receiveExchangeDeleteOk(channelId); - break; - case 0x00280016: - ExchangeBoundBody.process(channelId, in, dispatcher); - break; - case 0x00280017: - ExchangeBoundOkBody.process(channelId, in, dispatcher); - break; - - - // QUEUE_CLASS: - - case 0x0032000a: - QueueDeclareBody.process(channelId, in, dispatcher); - break; - case 0x0032000b: - QueueDeclareOkBody.process(channelId, in, dispatcher); - break; - case 0x00320014: - QueueBindBody.process(channelId, in, dispatcher); - break; - case 0x00320015: - dispatcher.receiveQueueBindOk(channelId); - break; - case 0x0032001e: - QueuePurgeBody.process(channelId, in, dispatcher); - break; - case 0x0032001f: - QueuePurgeOkBody.process(channelId, in, dispatcher); - break; - case 0x00320028: - QueueDeleteBody.process(channelId, in, dispatcher); - break; - case 0x00320029: - QueueDeleteOkBody.process(channelId, in, dispatcher); - break; - case 0x00320032: - QueueUnbindBody.process(channelId, in, dispatcher); - break; - case 0x00320033: - dispatcher.receiveQueueUnbindOk(channelId); - break; - - - // BASIC_CLASS: - - case 0x003c000a: - BasicQosBody.process(channelId, in, dispatcher); - break; - case 0x003c000b: - dispatcher.receiveBasicQosOk(channelId); - break; - case 0x003c0014: - BasicConsumeBody.process(channelId, in, dispatcher); - break; - case 0x003c0015: - BasicConsumeOkBody.process(channelId, in, dispatcher); - break; - case 0x003c001e: - BasicCancelBody.process(channelId, in, dispatcher); - break; - case 0x003c001f: - BasicCancelOkBody.process(channelId, in, dispatcher); - break; - case 0x003c0028: - BasicPublishBody.process(channelId, in, dispatcher); - break; - case 0x003c0032: - BasicReturnBody.process(channelId, in, dispatcher); - break; - case 0x003c003c: - BasicDeliverBody.process(channelId, in, dispatcher); - break; - case 0x003c0046: - BasicGetBody.process(channelId, in, dispatcher); - break; - case 0x003c0047: - BasicGetOkBody.process(channelId, in, dispatcher); - break; - case 0x003c0048: - BasicGetEmptyBody.process(channelId, in, dispatcher); - break; - case 0x003c0050: - BasicAckBody.process(channelId, in, dispatcher); - break; - case 0x003c005a: - BasicRejectBody.process(channelId, in, dispatcher); - break; - case 0x003c0064: - BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher); - break; - case 0x003c0065: - dispatcher.receiveBasicRecoverSyncOk(channelId); - break; - case 0x003c0066: - BasicRecoverSyncBody.process(channelId, in, dispatcher); - break; - case 0x003c006e: - BasicRecoverSyncBody.process(channelId, in, dispatcher); - break; - case 0x003c006f: - dispatcher.receiveBasicRecoverSyncOk(channelId); - break; - - // TX_CLASS: - - case 0x005a000a: - dispatcher.receiveTxSelect(channelId); - break; - case 0x005a000b: - dispatcher.receiveTxSelectOk(channelId); - break; - case 0x005a0014: - dispatcher.receiveTxCommit(channelId); - break; - case 0x005a0015: - dispatcher.receiveTxCommitOk(channelId); - break; - case 0x005a001e: - dispatcher.receiveTxRollback(channelId); - break; - case 0x005a001f: - dispatcher.receiveTxRollbackOk(channelId); - break; - - default: - throw newUnknownMethodException(classId, methodId, - dispatcher.getProtocolVersion()); - - } - } - finally - { - dispatcher.setCurrentMethod(0,0); - } - } - - private AMQFrameDecodingException newUnknownMethodException(final int classId, - final int methodId, - ProtocolVersion protocolVersion) - { - return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID, - "Method " - + methodId - + " unknown in AMQP version " - + protocolVersion - + " (while trying to decode class " - + classId - + " method " - + methodId - + "."); - } - - -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java index ce2a5a1317..8dec50c400 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java @@ -165,9 +165,8 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { AMQShortString realm = buffer.readAMQShortString(); byte bitfield = buffer.readByte(); @@ -176,6 +175,9 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ boolean active = (bitfield & 0x04) == 0x4 ; boolean write = (bitfield & 0x08) == 0x8 ; boolean read = (bitfield & 0x10) == 0x10 ; - dispatcher.receiveAccessRequest(channelId, realm, exclusive, passive, active, write, read); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveAccessRequest(realm, exclusive, passive, active, write, read); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java index 10be4d45c8..7ed0b3602b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java @@ -95,10 +95,14 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); - dispatcher.receiveAccessRequestOk(channelId, ticket); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveAccessRequestOk(ticket); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java index 70e3f10148..68782231fe 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java @@ -112,13 +112,15 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean multiple = (buffer.readByte() & 0x01) != 0; - dispatcher.receiveBasicAck(channelId, deliveryTag, multiple); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicAck(deliveryTag, multiple); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java index 6f74b3870a..c9a870e2a5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java @@ -113,13 +113,15 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); boolean noWait = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveBasicCancel(channelId, consumerTag, noWait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicCancel(consumerTag, noWait); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java index 0e9bc52d66..8d16aa44ec 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java @@ -96,10 +96,14 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = in.readAMQShortString(); - dispatcher.receiveBasicCancelOk(channelId, consumerTag); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicCancelOk(consumerTag); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java index 94396418fe..502fa07e78 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java @@ -191,7 +191,8 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { @@ -205,6 +206,9 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD boolean exclusive = (bitfield & 0x04) == 0x04; boolean nowait = (bitfield & 0x08) == 0x08; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveBasicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicConsume(queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java index d42c722fdf..d3df7f222a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java @@ -96,10 +96,14 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); - dispatcher.receiveBasicConsumeOk(channelId, consumerTag); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicConsumeOk(consumerTag); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java index afa38d1852..f61ee2d55b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java @@ -152,9 +152,8 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString consumerTag = buffer.readAMQShortString(); @@ -162,6 +161,9 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD boolean redelivered = (buffer.readByte() & 0x01) != 0; AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); - dispatcher.receiveBasicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java index 93429b97d8..68a6f2980b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java @@ -125,13 +125,17 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); AMQShortString queue = buffer.readAMQShortString(); boolean noAck = (buffer.readByte() & 0x01) != 0; - dispatcher.receiveBasicGet(channelId, queue, noAck); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicGet(queue, noAck); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java index a42df6bcc7..f37fb632db 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java @@ -96,11 +96,13 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString clusterId = buffer.readAMQShortString(); - dispatcher.receiveBasicGetEmpty(channelId); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicGetEmpty(); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java index b8af656a35..37e9bdae5a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java @@ -151,15 +151,17 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean redelivered = (buffer.readByte() & 0x01) != 0; AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); long messageCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveBasicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicGetOk(deliveryTag, redelivered, exchange, routingKey, messageCount); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java index 910942c2f1..8e5d71a804 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java @@ -151,9 +151,8 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); @@ -163,6 +162,9 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD boolean mandatory = (bitfield & 0x01) != 0; boolean immediate = (bitfield & 0x02) != 0; - dispatcher.receiveBasicPublish(channelId, exchange, routingKey, mandatory, immediate); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicPublish(exchange, routingKey, mandatory, immediate); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java index fb6b6956c6..6b7e90f41f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java @@ -124,14 +124,16 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { long prefetchSize = EncodingUtils.readUnsignedInteger(buffer); int prefetchCount = buffer.readUnsignedShort(); boolean global = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveBasicQos(channelId, prefetchSize, prefetchCount, global); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicQos(prefetchSize, prefetchCount, global); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java index 2519f25fbe..e5490c4827 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java @@ -100,14 +100,16 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput in, - final ProtocolVersion protocolVersion, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput in, + final ProtocolVersion protocolVersion, + final ServerChannelMethodProcessor dispatcher) throws IOException { boolean requeue = (in.readByte() & 0x01) == 0x01; boolean sync = (ProtocolVersion.v8_0.equals(protocolVersion)); - dispatcher.receiveBasicRecover(channelId, requeue, sync); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicRecover(requeue, sync); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java index 16c9798977..f82ee78862 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java @@ -103,11 +103,13 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput in, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput in, + final ServerChannelMethodProcessor dispatcher) throws IOException { boolean requeue = (in.readByte() & 0x01) == 0x01; - dispatcher.receiveBasicRecover(channelId, requeue, true); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicRecover(requeue, true); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java index 8e1ebf4013..8c8757f1d2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java @@ -112,13 +112,15 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); boolean requeue = (buffer.readByte() & 0x01) != 0; - dispatcher.receiveBasicReject(channelId, deliveryTag, requeue); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicReject(deliveryTag, requeue); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java index cff9914705..afdb343c9f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java @@ -134,15 +134,17 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); - dispatcher.receiveBasicReturn(channelId, replyCode, replyText, exchange, routingKey); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicReturn(replyCode, replyText, exchange, routingKey); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java index 11dcffc175..289cf2cc10 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java @@ -121,13 +121,17 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); FieldTable details = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveChannelAlert(channelId, replyCode, replyText, details); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelAlert(replyCode, replyText, details); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java index a4f54fbe7d..a3b92a1fad 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java @@ -132,15 +132,17 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); int classId = buffer.readUnsignedShort(); int methodId = buffer.readUnsignedShort(); - dispatcher.receiveChannelClose(channelId, replyCode, replyText, classId, methodId); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelClose(replyCode, replyText, classId, methodId); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java index c975744d9f..1c3cc47d4e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java @@ -92,11 +92,13 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor dispatcher) throws IOException { boolean active = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveChannelFlow(channelId, active); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelFlow(active); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java index a62c6155f8..9d4a2b09a1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java @@ -93,10 +93,14 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor dispatcher) throws IOException { boolean active = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveChannelFlowOk(channelId, active); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelFlowOk(active); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java new file mode 100644 index 0000000000..84cd1e13c2 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ChannelMethodProcessor +{ + void receiveChannelFlow(boolean active); + + void receiveChannelFlowOk(boolean active); + + void receiveChannelClose(int replyCode, AMQShortString replyText, int classId, int methodId); + + void receiveChannelCloseOk(); + + void receiveMessageContent(byte[] data); + + void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize); + + boolean ignoreAllButCloseOk(); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java index 9da45d3d70..af583f5fda 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java @@ -84,9 +84,12 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + final ServerMethodProcessor dispatcher) throws IOException { buffer.readAMQShortString(); - dispatcher.receiveChannelOpen(channelId); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelOpen(channelId); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java index 775a08fbd4..e3b4f38a8c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java @@ -96,16 +96,18 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ return "[ChannelOpenOkBody]"; } - public static void process(final int channelId, - final MarkableDataInput in, - final ProtocolVersion protocolVersion, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput in, + final ProtocolVersion protocolVersion, + final ClientChannelMethodProcessor dispatcher) throws IOException { if(!ProtocolVersion.v8_0.equals(protocolVersion)) { EncodingUtils.readBytes(in); } - dispatcher.receiveChannelOpenOk(channelId); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveChannelOpenOk(); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java new file mode 100644 index 0000000000..bef143e39b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ClientChannelMethodProcessor extends ChannelMethodProcessor +{ + void receiveChannelOpenOk(); + + void receiveChannelAlert(int replyCode, final AMQShortString replyText, FieldTable details); + + void receiveAccessRequestOk(int ticket); + + void receiveExchangeDeclareOk(); + + void receiveExchangeDeleteOk(); + + void receiveExchangeBoundOk(int replyCode, AMQShortString replyText); + + void receiveQueueBindOk(); + + void receiveQueueUnbindOk(); + + void receiveQueueDeclareOk(final AMQShortString queue, long messageCount, long consumerCount); + + void receiveQueuePurgeOk(long messageCount); + + void receiveQueueDeleteOk(long messageCount); + + void receiveBasicRecoverSyncOk(); + + void receiveBasicQosOk(); + + void receiveBasicConsumeOk(AMQShortString consumerTag); + + void receiveBasicCancelOk(AMQShortString consumerTag); + + void receiveBasicReturn(int replyCode, + AMQShortString replyText, + AMQShortString exchange, + AMQShortString routingKey); + + void receiveBasicDeliver(AMQShortString consumerTag, + long deliveryTag, + boolean redelivered, + AMQShortString exchange, AMQShortString routingKey); + + void receiveBasicGetOk(long deliveryTag, + boolean redelivered, + AMQShortString exchange, + AMQShortString routingKey, long messageCount); + + void receiveBasicGetEmpty(); + + void receiveTxSelectOk(); + + void receiveTxCommitOk(); + + void receiveTxRollbackOk(); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java new file mode 100644 index 0000000000..0b599ee40a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ClientMethodProcessor<T extends ClientChannelMethodProcessor> extends MethodProcessor<T> +{ + void receiveConnectionStart(short versionMajor, + short versionMinor, + FieldTable serverProperties, + byte[] mechanisms, + byte[] locales); + + void receiveConnectionSecure(byte[] challenge); + + void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts); + + void receiveConnectionTune(int channelMax, long frameMax, int heartbeat); + + void receiveConnectionOpenOk(AMQShortString knownHosts); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java index 0e685deb7c..7fb815ae40 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java @@ -121,12 +121,15 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ServerMethodProcessor dispatcher) throws IOException { AMQShortString virtualHost = buffer.readAMQShortString(); AMQShortString capabilities = buffer.readAMQShortString(); boolean insist = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java index 6d1e80c624..95c48873f3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java @@ -96,10 +96,13 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ClientMethodProcessor dispatcher) throws IOException { AMQShortString knownHosts = buffer.readAMQShortString(); - dispatcher.receiveConnectionOpenOk(knownHosts); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionOpenOk(knownHosts); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java index a9b9a43b1a..491cc25125 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java @@ -108,10 +108,13 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ClientMethodProcessor dispatcher) throws IOException { AMQShortString host = buffer.readAMQShortString(); AMQShortString knownHosts = buffer.readAMQShortString(); - dispatcher.receiveConnectionRedirect(host, knownHosts); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionRedirect(host, knownHosts); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java index 1f7f2b0221..e10af3b4c1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java @@ -96,11 +96,14 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, final ClientMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { byte[] challenge = EncodingUtils.readBytes(in); - dispatcher.receiveConnectionSecure(challenge); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionSecure(challenge); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java index 9a4668a9c7..4c4a249bb6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java @@ -96,9 +96,12 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab return buf.toString(); } - public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput in, final ServerMethodProcessor dispatcher) throws IOException { byte[] response = EncodingUtils.readBytes(in); - dispatcher.receiveConnectionSecureOk(response); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionSecureOk(response); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java index 4f47f0632f..3b94919d4e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java @@ -136,7 +136,7 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, final ClientMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { short versionMajor = (short) in.readUnsignedByte(); @@ -145,7 +145,9 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA byte[] mechanisms = EncodingUtils.readBytes(in); byte[] locales = EncodingUtils.readBytes(in); - - dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java index da3d0a2c56..5b6a8e3ef7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java @@ -126,7 +126,7 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl return buf.toString(); } - public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput in, final ServerMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { @@ -134,7 +134,9 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl AMQShortString mechanism = in.readAMQShortString(); byte[] response = EncodingUtils.readBytes(in); AMQShortString locale = in.readAMQShortString(); - - dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java index 3383fd889a..04def21d44 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java @@ -119,12 +119,15 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ClientMethodProcessor dispatcher) throws IOException { int channelMax = buffer.readUnsignedShort(); long frameMax = EncodingUtils.readUnsignedInteger(buffer); int heartbeat = buffer.readUnsignedShort(); - dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java index f695eda2c4..3141a85766 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java @@ -119,12 +119,15 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable return buf.toString(); } - public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final ServerMethodProcessor dispatcher) throws IOException { int channelMax = buffer.readUnsignedShort(); long frameMax = EncodingUtils.readUnsignedInteger(buffer); int heartbeat = buffer.readUnsignedShort(); - dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 01beb3af77..4d9826d83c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -21,7 +21,6 @@ package org.apache.qpid.framing; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; @@ -73,33 +72,20 @@ public class ContentBody implements AMQBody session.contentBodyReceived(channelId, this); } - protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException - { - if (size > 0) - { - _payload = new byte[(int)size]; - buffer.read(getPayload()); - } - - } - - public void reduceBufferToFit() - { - } - public byte[] getPayload() { return _payload; } - public static void process(final int channel, - final MarkableDataInput in, - final MethodProcessor methodProcessor, final long bodySize) + public static void process(final MarkableDataInput in, + final ChannelMethodProcessor methodProcessor, final long bodySize) throws IOException { + byte[] payload = new byte[(int)bodySize]; in.readFully(payload); - methodProcessor.receiveMessageContent(channel, payload); + + methodProcessor.receiveMessageContent(payload); } private static class BufferContentBody implements AMQBody diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 0d54e09ae5..0d25e4dfba 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -155,9 +155,8 @@ public class ContentHeaderBody implements AMQBody _bodySize = bodySize; } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor methodProcessor, final long size) + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor methodProcessor, final long size) throws IOException, AMQFrameDecodingException { @@ -168,13 +167,13 @@ public class ContentHeaderBody implements AMQBody BasicContentHeaderProperties properties; - if (classId != BasicConsumeBody.CLASS_ID) + if (classId != CLASS_ID) { throw new AMQFrameDecodingException(null, "Unsupported content header class id: " + classId, null); } - properties = new BasicContentHeaderProperties(); + properties = new BasicContentHeaderProperties(); properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14)); - methodProcessor.receiveMessageHeader(channelId, properties, bodySize); + methodProcessor.receiveMessageHeader(properties, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java index 7548db6e93..e8dc2ae442 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java @@ -122,13 +122,17 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); AMQShortString queue = buffer.readAMQShortString(); - dispatcher.receiveExchangeBound(channelId, exchange, routingKey, queue); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveExchangeBound(exchange, routingKey, queue); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java index 6b02b066ae..ef91c1d635 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java @@ -115,12 +115,16 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); - dispatcher.receiveExchangeBoundOk(channelId, replyCode, replyText); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveExchangeBoundOk(replyCode, replyText); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java index 06e590f8e5..4001ba7aa0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java @@ -204,9 +204,8 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -219,14 +218,16 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA boolean internal = (bitfield & 0x8) == 0x8; boolean nowait = (bitfield & 0x10) == 0x10; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveExchangeDeclare(channelId, - exchange, - type, - passive, - durable, - autoDelete, - internal, - nowait, - arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveExchangeDeclare(exchange, + type, + passive, + durable, + autoDelete, + internal, + nowait, + arguments); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java index 4a30e25502..f4646315cd 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java @@ -138,7 +138,8 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { @@ -147,6 +148,9 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM byte bitfield = buffer.readByte(); boolean ifUnused = (bitfield & 0x01) == 0x01; boolean nowait = (bitfield & 0x02) == 0x02; - dispatcher.receiveExchangeDelete(channelId, exchange, ifUnused, nowait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveExchangeDelete(exchange, ifUnused, nowait); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java index 1ad0f3081b..19b091a359 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java @@ -23,7 +23,9 @@ package org.apache.qpid.framing; import java.util.ArrayList; import java.util.List; -public class FrameCreatingMethodProcessor implements MethodProcessor +public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>, + ClientMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>, + ServerMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor> { private ProtocolVersion _protocolVersion; @@ -61,42 +63,6 @@ public class FrameCreatingMethodProcessor implements MethodProcessor } @Override - public void receiveTxSelect(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE)); - } - - @Override - public void receiveTxSelectOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE)); - } - - @Override - public void receiveTxCommit(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE)); - } - - @Override - public void receiveTxCommitOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE)); - } - - @Override - public void receiveTxRollback(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE)); - } - - @Override - public void receiveTxRollbackOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE)); - } - - @Override public void receiveConnectionSecure(final byte[] challenge) { _processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge))); @@ -163,382 +129,483 @@ public class FrameCreatingMethodProcessor implements MethodProcessor _processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody())); } - @Override - public void receiveChannelOpenOk(final int channelId) + private void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) { - _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) - ? ChannelOpenOkBody.INSTANCE_0_8 - : ChannelOpenOkBody.INSTANCE_0_9)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText))); } @Override - public void receiveChannelFlow(final int channelId, final boolean active) + public void receiveHeartbeat() { - _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active))); + _processedMethods.add(new AMQFrame(0, new HeartbeatBody())); } @Override - public void receiveChannelFlowOk(final int channelId, final boolean active) + public ProtocolVersion getProtocolVersion() { - _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active))); + return _protocolVersion; } @Override - public void receiveChannelAlert(final int channelId, - final int replyCode, - final AMQShortString replyText, - final FieldTable details) + public ClientAndServerChannelMethodProcessor getChannelMethodProcessor(final int channelId) { - _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details))); + return new FrameCreatingChannelMethodProcessor(channelId); } - @Override - public void receiveChannelClose(final int channelId, - final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) + public void setProtocolVersion(final ProtocolVersion protocolVersion) { - _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId))); + _protocolVersion = protocolVersion; } @Override - public void receiveChannelCloseOk(final int channelId) + public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) { - _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE)); + _processedMethods.add(protocolInitiation); } @Override - public void receiveAccessRequest(final int channelId, - final AMQShortString realm, - final boolean exclusive, - final boolean passive, - final boolean active, - final boolean write, - final boolean read) + public void setCurrentMethod(final int classId, final int methodId) { - _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read))); + _classId = classId; + _methodId = methodId; } @Override - public void receiveAccessRequestOk(final int channelId, final int ticket) + public boolean ignoreAllButCloseOk() { - _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket))); + return false; } - @Override - public void receiveExchangeDeclare(final int channelId, - final AMQShortString exchange, - final AMQShortString type, - final boolean passive, - final boolean durable, - final boolean autoDelete, - final boolean internal, - final boolean nowait, final FieldTable arguments) + public int getClassId() { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments))); + return _classId; } - @Override - public void receiveExchangeDeclareOk(final int channelId) + public int getMethodId() { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody())); + return _methodId; } - @Override - public void receiveExchangeDelete(final int channelId, - final AMQShortString exchange, - final boolean ifUnused, - final boolean nowait) + public static interface ClientAndServerChannelMethodProcessor extends ServerChannelMethodProcessor, ClientChannelMethodProcessor { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait))); - } - @Override - public void receiveExchangeDeleteOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody())); } - @Override - public void receiveExchangeBound(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final AMQShortString queue) + private class FrameCreatingChannelMethodProcessor implements ClientAndServerChannelMethodProcessor { - _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue))); - } + private final int _channelId; - @Override - public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) - { - _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText))); - } + private FrameCreatingChannelMethodProcessor(final int channelId) + { + _channelId = channelId; + } - @Override - public void receiveQueueBindOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody())); - } - @Override - public void receiveQueueUnbindOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody())); - } + @Override + public void receiveChannelOpenOk() + { + _processedMethods.add(new AMQFrame(_channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) + ? ChannelOpenOkBody.INSTANCE_0_8 + : ChannelOpenOkBody.INSTANCE_0_9)); + } - @Override - public void receiveQueueDeclare(final int channelId, - final AMQShortString queue, - final boolean passive, - final boolean durable, - final boolean exclusive, - final boolean autoDelete, - final boolean nowait, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments))); - } + @Override + public void receiveChannelAlert(final int replyCode, final AMQShortString replyText, final FieldTable details) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelAlertBody(replyCode, replyText, details))); + } - @Override - public void receiveQueueDeclareOk(final int channelId, - final AMQShortString queue, - final long messageCount, - final long consumerCount) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount))); - } + @Override + public void receiveAccessRequestOk(final int ticket) + { + _processedMethods.add(new AMQFrame(_channelId, new AccessRequestOkBody(ticket))); + } - @Override - public void receiveQueueBind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final boolean nowait, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments))); - } + @Override + public void receiveExchangeDeclareOk() + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareOkBody())); + } - @Override - public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait) - { - _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait))); - } + @Override + public void receiveExchangeDeleteOk() + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteOkBody())); + } - @Override - public void receiveQueuePurgeOk(final int channelId, final long messageCount) - { - _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount))); - } + @Override + public void receiveExchangeBoundOk(final int replyCode, final AMQShortString replyText) + { + FrameCreatingMethodProcessor.this.receiveExchangeBoundOk(_channelId, replyCode, replyText); + } - @Override - public void receiveQueueDelete(final int channelId, - final AMQShortString queue, - final boolean ifUnused, - final boolean ifEmpty, - final boolean nowait) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait))); - } + @Override + public void receiveQueueBindOk() + { + _processedMethods.add(new AMQFrame(_channelId, new QueueBindOkBody())); + } - @Override - public void receiveQueueDeleteOk(final int channelId, final long messageCount) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount))); - } + @Override + public void receiveQueueUnbindOk() + { + _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindOkBody())); + } - @Override - public void receiveQueueUnbind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments))); - } + @Override + public void receiveQueueDeclareOk(final AMQShortString queue, final long messageCount, final long consumerCount) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount))); + } - @Override - public void receiveBasicRecoverSyncOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()))); - } + @Override + public void receiveQueuePurgeOk(final long messageCount) + { + _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeOkBody(messageCount))); + } - @Override - public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync) - { - if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) + @Override + public void receiveQueueDeleteOk(final long messageCount) { - _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue))); + _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteOkBody(messageCount))); } - else + + @Override + public void receiveBasicRecoverSyncOk() { - _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue))); + _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncOkBody(getProtocolVersion()))); } - } - @Override - public void receiveBasicQos(final int channelId, - final long prefetchSize, - final int prefetchCount, - final boolean global) - { - _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global))); - } + @Override + public void receiveBasicQosOk() + { + _processedMethods.add(new AMQFrame(_channelId, new BasicQosOkBody())); + } - @Override - public void receiveBasicQosOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody())); - } + @Override + public void receiveBasicConsumeOk(final AMQShortString consumerTag) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeOkBody(consumerTag))); + } - @Override - public void receiveBasicConsume(final int channelId, - final AMQShortString queue, - final AMQShortString consumerTag, - final boolean noLocal, - final boolean noAck, - final boolean exclusive, - final boolean nowait, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments))); - } + @Override + public void receiveBasicCancelOk(final AMQShortString consumerTag) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicCancelOkBody(consumerTag))); + } - @Override - public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag) - { - _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag))); - } + @Override + public void receiveBasicReturn(final int replyCode, + final AMQShortString replyText, + final AMQShortString exchange, + final AMQShortString routingKey) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicReturnBody(replyCode, + replyText, + exchange, + routingKey))); + } - @Override - public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) - { - _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait))); - } + @Override + public void receiveBasicDeliver(final AMQShortString consumerTag, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicDeliverBody(consumerTag, + deliveryTag, + redelivered, + exchange, + routingKey))); + } - @Override - public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag) - { - _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag))); - } + @Override + public void receiveBasicGetOk(final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey, + final long messageCount) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicGetOkBody(deliveryTag, + redelivered, + exchange, + routingKey, + messageCount))); + } - @Override - public void receiveBasicPublish(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final boolean mandatory, - final boolean immediate) - { - _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate))); - } + @Override + public void receiveBasicGetEmpty() + { + _processedMethods.add(new AMQFrame(_channelId, new BasicGetEmptyBody((AMQShortString)null))); + } - @Override - public void receiveBasicReturn(final int channelId, final int replyCode, - final AMQShortString replyText, - final AMQShortString exchange, - final AMQShortString routingKey) - { - _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey))); - } + @Override + public void receiveTxSelectOk() + { + _processedMethods.add(new AMQFrame(_channelId, TxSelectOkBody.INSTANCE)); + } - @Override - public void receiveBasicDeliver(final int channelId, - final AMQShortString consumerTag, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey) - { - _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); - } + @Override + public void receiveTxCommitOk() + { + _processedMethods.add(new AMQFrame(_channelId, TxCommitOkBody.INSTANCE)); + } - @Override - public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck) - { - _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck))); - } + @Override + public void receiveTxRollbackOk() + { + _processedMethods.add(new AMQFrame(_channelId, TxRollbackOkBody.INSTANCE)); + } - @Override - public void receiveBasicGetOk(final int channelId, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey, - final long messageCount) - { - _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); - } + @Override + public void receiveAccessRequest(final AMQShortString realm, + final boolean exclusive, + final boolean passive, + final boolean active, + final boolean write, + final boolean read) + { + _processedMethods.add(new AMQFrame(_channelId, new AccessRequestBody(realm, + exclusive, + passive, + active, + write, + read))); + } - @Override - public void receiveBasicGetEmpty(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null))); - } + @Override + public void receiveExchangeDeclare(final AMQShortString exchange, + final AMQShortString type, + final boolean passive, + final boolean durable, + final boolean autoDelete, + final boolean internal, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareBody(0, + exchange, + type, + passive, + durable, + autoDelete, + internal, + nowait, + arguments))); + } - @Override - public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple) - { - _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple))); - } + @Override + public void receiveExchangeDelete(final AMQShortString exchange, final boolean ifUnused, final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait))); + } - @Override - public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue) - { - _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue))); - } + @Override + public void receiveExchangeBound(final AMQShortString exchange, + final AMQShortString routingKey, + final AMQShortString queue) + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeBoundBody(exchange, routingKey, queue))); + } - @Override - public void receiveHeartbeat() - { - _processedMethods.add(new AMQFrame(0, new HeartbeatBody())); - } + @Override + public void receiveQueueDeclare(final AMQShortString queue, + final boolean passive, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareBody(0, + queue, + passive, + durable, + exclusive, + autoDelete, + nowait, + arguments))); + } - @Override - public ProtocolVersion getProtocolVersion() - { - return _protocolVersion; - } + @Override + public void receiveQueueBind(final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueBindBody(0, + queue, + exchange, + bindingKey, + nowait, + arguments))); + } - public void setProtocolVersion(final ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - } + @Override + public void receiveQueuePurge(final AMQShortString queue, final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeBody(0, queue, nowait))); + } - @Override - public void receiveMessageContent(final int channelId, final byte[] data) - { - _processedMethods.add(new AMQFrame(channelId, new ContentBody(data))); - } + @Override + public void receiveQueueDelete(final AMQShortString queue, + final boolean ifUnused, + final boolean ifEmpty, + final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait))); + } - @Override - public void receiveMessageHeader(final int channelId, - final BasicContentHeaderProperties properties, - final long bodySize) - { - _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize))); - } + @Override + public void receiveQueueUnbind(final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindBody(0, + queue, + exchange, + bindingKey, + arguments))); + } - @Override - public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) - { - _processedMethods.add(protocolInitiation); - } + @Override + public void receiveBasicRecover(final boolean requeue, final boolean sync) + { + if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverBody(requeue))); + } + else + { + _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue))); + } + } - @Override - public void setCurrentMethod(final int classId, final int methodId) - { - _classId = classId; - _methodId = methodId; - } + @Override + public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicQosBody(prefetchSize, prefetchCount, global))); + } - public int getClassId() - { - return _classId; - } + @Override + public void receiveBasicConsume(final AMQShortString queue, + final AMQShortString consumerTag, + final boolean noLocal, + final boolean noAck, + final boolean exclusive, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeBody(0, + queue, + consumerTag, + noLocal, + noAck, + exclusive, + nowait, + arguments))); + } - public int getMethodId() - { - return _methodId; + @Override + public void receiveBasicCancel(final AMQShortString consumerTag, final boolean noWait) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicCancelBody(consumerTag, noWait))); + } + + @Override + public void receiveBasicPublish(final AMQShortString exchange, + final AMQShortString routingKey, + final boolean mandatory, + final boolean immediate) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicPublishBody(0, + exchange, + routingKey, + mandatory, + immediate))); + } + + @Override + public void receiveBasicGet(final AMQShortString queue, final boolean noAck) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicGetBody(0, queue, noAck))); + } + + @Override + public void receiveBasicAck(final long deliveryTag, final boolean multiple) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicAckBody(deliveryTag, multiple))); + } + + @Override + public void receiveBasicReject(final long deliveryTag, final boolean requeue) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicRejectBody(deliveryTag, requeue))); + } + + @Override + public void receiveTxSelect() + { + _processedMethods.add(new AMQFrame(_channelId, TxSelectBody.INSTANCE)); + } + + @Override + public void receiveTxCommit() + { + _processedMethods.add(new AMQFrame(_channelId, TxCommitBody.INSTANCE)); + } + + @Override + public void receiveTxRollback() + { + _processedMethods.add(new AMQFrame(_channelId, TxRollbackBody.INSTANCE)); + } + + @Override + public void receiveChannelFlow(final boolean active) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowBody(active))); + } + + @Override + public void receiveChannelFlowOk(final boolean active) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowOkBody(active))); + } + + @Override + public void receiveChannelClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId))); + } + + @Override + public void receiveChannelCloseOk() + { + _processedMethods.add(new AMQFrame(_channelId, ChannelCloseOkBody.INSTANCE)); + } + + @Override + public void receiveMessageContent(final byte[] data) + { + _processedMethods.add(new AMQFrame(_channelId, new ContentBody(data))); + } + + @Override + public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize) + { + _processedMethods.add(new AMQFrame(_channelId, new ContentHeaderBody(properties, bodySize))); + } + + @Override + public boolean ignoreAllButCloseOk() + { + return false; + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java index 0b08059631..62c0cd3c6d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java @@ -20,184 +20,21 @@ */ package org.apache.qpid.framing; -public interface MethodProcessor +public interface MethodProcessor<T extends ChannelMethodProcessor> { ProtocolVersion getProtocolVersion(); - void receiveConnectionStart(short versionMajor, - short versionMinor, - FieldTable serverProperties, - byte[] mechanisms, - byte[] locales); - - void receiveConnectionStartOk(FieldTable clientProperties, - AMQShortString mechanism, - byte[] response, - AMQShortString locale); - - void receiveTxSelect(int channelId); - - void receiveTxSelectOk(int channelId); - - void receiveTxCommit(int channelId); - - void receiveTxCommitOk(int channelId); - - void receiveTxRollback(int channelId); - - void receiveTxRollbackOk(int channelId); - - void receiveConnectionSecure(byte[] challenge); - - void receiveConnectionSecureOk(byte[] response); - - void receiveConnectionTune(int channelMax, long frameMax, int heartbeat); - - void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); - - void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); - - void receiveConnectionOpenOk(AMQShortString knownHosts); - - void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts); + T getChannelMethodProcessor(int channelId); void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); void receiveConnectionCloseOk(); - void receiveChannelOpen(int channelId); - - void receiveChannelOpenOk(int channelId); - - void receiveChannelFlow(int channelId, boolean active); - - void receiveChannelFlowOk(int channelId, boolean active); - - void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details); - - void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId); - - void receiveChannelCloseOk(int channelId); - - void receiveAccessRequest(int channelId, - AMQShortString realm, - boolean exclusive, - boolean passive, - boolean active, - boolean write, boolean read); - - void receiveAccessRequestOk(int channelId, int ticket); - - void receiveExchangeDeclare(int channelId, - AMQShortString exchange, - AMQShortString type, - boolean passive, - boolean durable, - boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); - - void receiveExchangeDeclareOk(int channelId); - - void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait); - - void receiveExchangeDeleteOk(int channelId); - - void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); - - void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText); - - void receiveQueueBindOk(int channelId); - - void receiveQueueUnbindOk(final int channelId); - - void receiveQueueDeclare(int channelId, - AMQShortString queue, - boolean passive, - boolean durable, - boolean exclusive, - boolean autoDelete, boolean nowait, FieldTable arguments); - - void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount); - - void receiveQueueBind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - boolean nowait, FieldTable arguments); - - void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait); - - void receiveQueuePurgeOk(int channelId, long messageCount); - - void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); - - void receiveQueueDeleteOk(int channelId, long messageCount); - - void receiveQueueUnbind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - FieldTable arguments); - - void receiveBasicRecoverSyncOk(int channelId); - - void receiveBasicRecover(int channelId, final boolean requeue, boolean sync); - - void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global); - - void receiveBasicQosOk(int channelId); - - void receiveBasicConsume(int channelId, - AMQShortString queue, - AMQShortString consumerTag, - boolean noLocal, - boolean noAck, - boolean exclusive, boolean nowait, FieldTable arguments); - - void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag); - - void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait); - - void receiveBasicCancelOk(int channelId, AMQShortString consumerTag); - - void receiveBasicPublish(int channelId, - AMQShortString exchange, - AMQShortString routingKey, - boolean mandatory, - boolean immediate); - - void receiveBasicReturn(final int channelId, - int replyCode, - AMQShortString replyText, - AMQShortString exchange, - AMQShortString routingKey); - - void receiveBasicDeliver(int channelId, - AMQShortString consumerTag, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, AMQShortString routingKey); - - void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck); - - void receiveBasicGetOk(int channelId, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, - AMQShortString routingKey, long messageCount); - - void receiveBasicGetEmpty(int channelId); - - void receiveBasicAck(int channelId, long deliveryTag, boolean multiple); - - void receiveBasicReject(int channelId, long deliveryTag, boolean requeue); - void receiveHeartbeat(); - void receiveMessageContent(int channelId, byte[] data); - - void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); - void receiveProtocolHeader(ProtocolInitiation protocolInitiation); void setCurrentMethod(int classId, int methodId); + + boolean ignoreAllButCloseOk(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java index e4419f77e3..2b7e26a7f0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java @@ -165,9 +165,8 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -176,6 +175,9 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData AMQShortString bindingKey = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueBind(queue, exchange, bindingKey, nowait, arguments); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java index 1f9888c76a..5a359dc8df 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java @@ -191,9 +191,8 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -206,6 +205,9 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD boolean autoDelete = (bitfield & 0x08 ) == 0x08; boolean nowait = (bitfield & 0x010 ) == 0x010; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java index 9857bb3a39..cf6fc656b3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java @@ -120,13 +120,15 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString queue = buffer.readAMQShortString(); long messageCount = EncodingUtils.readUnsignedInteger(buffer); long consumerCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDeclareOk(queue, messageCount, consumerCount); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java index 408f9f9667..ea933dc644 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java @@ -151,9 +151,8 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); @@ -163,6 +162,9 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa boolean ifUnused = (bitfield & 0x01) == 0x01; boolean ifEmpty = (bitfield & 0x02) == 0x02; boolean nowait = (bitfield & 0x04) == 0x04; - dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDelete(queue, ifUnused, ifEmpty, nowait); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java index b43369b68a..6d50153c15 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java @@ -95,11 +95,13 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveQueueDeleteOk(channelId, messageCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDeleteOk(messageCount); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java index 5a04e21355..58a424387c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java @@ -125,14 +125,16 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); AMQShortString queue = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveQueuePurge(channelId, queue, nowait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueuePurge(queue, nowait); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java index 40cac8b390..acab2bc052 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java @@ -95,11 +95,13 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveQueuePurgeOk(channelId, messageCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueuePurgeOk(messageCount); + } } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java index a6f3e5b4c5..30c5d19d27 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java @@ -147,9 +147,8 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -157,6 +156,9 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueUnbind(queue, exchange, routingKey, arguments); + } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java index d4c7f151e7..89b75c2d2f 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java @@ -18,92 +18,75 @@ * under the License. * */ -package org.apache.qpid.server.protocol.v0_8; +package org.apache.qpid.framing; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public interface ChannelMethodProcessor +public interface ServerChannelMethodProcessor extends ChannelMethodProcessor { void receiveAccessRequest(AMQShortString realm, boolean exclusive, boolean passive, boolean active, - boolean write, - boolean read); - - void receiveBasicAck(long deliveryTag, boolean multiple); - - void receiveBasicCancel(AMQShortString consumerTag, boolean nowait); - - void receiveBasicConsume(AMQShortString queue, - AMQShortString consumerTag, - boolean noLocal, - boolean noAck, - boolean exclusive, - boolean nowait, - FieldTable arguments); - - void receiveBasicGet(AMQShortString queue, boolean noAck); - - void receiveBasicPublish(AMQShortString exchange, - AMQShortString routingKey, - boolean mandatory, - boolean immediate); - - void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global); - - void receiveBasicRecover(boolean requeue, boolean sync); - - void receiveBasicReject(long deliveryTag, boolean requeue); - - void receiveChannelClose(); - - void receiveChannelCloseOk(); - - void receiveChannelFlow(boolean active); - - void receiveExchangeBound(AMQShortString exchange, AMQShortString queue, AMQShortString routingKey); + boolean write, boolean read); void receiveExchangeDeclare(AMQShortString exchange, AMQShortString type, boolean passive, boolean durable, - boolean autoDelete, - boolean internal, - boolean nowait, - FieldTable arguments); + boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait); - void receiveQueueBind(AMQShortString queue, - AMQShortString exchange, - AMQShortString routingKey, - boolean nowait, - FieldTable arguments); + void receiveExchangeBound(AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); - void receiveQueueDeclare(AMQShortString queueStr, + void receiveQueueDeclare(AMQShortString queue, boolean passive, boolean durable, boolean exclusive, - boolean autoDelete, - boolean nowait, - FieldTable arguments); + boolean autoDelete, boolean nowait, FieldTable arguments); - void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); + void receiveQueueBind(AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + boolean nowait, FieldTable arguments); void receiveQueuePurge(AMQShortString queue, boolean nowait); + void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); + void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, - AMQShortString routingKey, + AMQShortString bindingKey, FieldTable arguments); + void receiveBasicRecover(final boolean requeue, boolean sync); + + void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global); + + void receiveBasicConsume(AMQShortString queue, + AMQShortString consumerTag, + boolean noLocal, + boolean noAck, + boolean exclusive, boolean nowait, FieldTable arguments); + + void receiveBasicCancel(AMQShortString consumerTag, boolean noWait); + + void receiveBasicPublish(AMQShortString exchange, + AMQShortString routingKey, + boolean mandatory, + boolean immediate); + + void receiveBasicGet(AMQShortString queue, boolean noAck); + + void receiveBasicAck(long deliveryTag, boolean multiple); + + void receiveBasicReject(long deliveryTag, boolean requeue); + + + void receiveTxSelect(); void receiveTxCommit(); void receiveTxRollback(); - } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java b/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java index 6e657c022e..77b4a1fc6b 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java @@ -18,27 +18,22 @@ * under the License. * */ -package org.apache.qpid.server.protocol.v0_8; +package org.apache.qpid.framing; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public interface ConnectionMethodProcessor +public interface ServerMethodProcessor<T extends ServerChannelMethodProcessor> extends MethodProcessor<T> { - void receiveChannelOpen(int channelId); - - void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); - - void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); - - void receiveConnectionCloseOk(); - - void receiveConnectionSecureOk(byte[] response); - void receiveConnectionStartOk(FieldTable clientProperties, AMQShortString mechanism, byte[] response, AMQShortString locale); + void receiveConnectionSecureOk(byte[] response); + void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); + + void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); + + void receiveChannelOpen(int channelId); + + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java index bd3e9bbcbc..61d5f0629c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.transport.util; -import java.nio.ByteBuffer; - import static java.lang.Math.min; +import java.nio.ByteBuffer; + /** * Functions @@ -33,6 +33,9 @@ import static java.lang.Math.min; public final class Functions { + private static final char[] HEX_CHARACTERS = + {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + private Functions() { } @@ -102,4 +105,21 @@ public final class Functions return str(ByteBuffer.wrap(bytes), limit); } + public static String hex(byte[] bytes, int limit) + { + limit = Math.min(limit, bytes == null ? 0 : bytes.length); + StringBuilder sb = new StringBuilder(3 + limit*2); + for(int i = 0; i < limit; i++) + { + sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0xf0)>>4]); + sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0x0f)]); + + } + if(bytes != null && bytes.length>limit) + { + sb.append("..."); + } + return sb.toString(); + } + } diff --git a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java index 63696515c6..51f3ce1113 100644 --- a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java +++ b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java @@ -47,7 +47,7 @@ public class AMQDecoderTest extends TestCase public void setUp() { _methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); - _decoder = new AMQDecoder(false, _methodProcessor); + _decoder = new ClientDecoder(_methodProcessor); } diff --git a/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java index b4a8155978..f76203887c 100644 --- a/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java +++ b/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; @@ -40,13 +41,13 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; +import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.ClientDecoder; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQDataBlockDecoder; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ByteArrayDataInput; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneOkBody; @@ -234,14 +235,9 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase } byte[] serverData = baos.toByteArray(); - ByteArrayDataInput badi = new ByteArrayDataInput(serverData); - AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder(); final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); - - while (datablockDecoder.decodable(badi)) - { - datablockDecoder.processInput(methodProcessor, badi); - } + AMQDecoder decoder = new ClientDecoder(methodProcessor); + decoder.decodeBuffer(ByteBuffer.wrap(serverData)); evaluator.evaluate(socket, methodProcessor.getProcessedMethods()); } |