diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org')
8 files changed, 255 insertions, 2318 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 27fa654843..4087b1f4a0 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -115,7 +115,7 @@ import org.apache.qpid.transport.TransportException; public class AMQChannel implements AMQSessionModel<AMQChannel, AMQProtocolEngine>, AsyncAutoCommitTransaction.FutureRecorder, - ChannelMethodProcessor + ServerChannelMethodProcessor { public static final int DEFAULT_PREFETCH = 4096; @@ -376,27 +376,18 @@ public class AMQChannel } public void publishContentHeader(ContentHeaderBody contentHeaderBody) - throws AMQException { - if (_currentMessage == null) + if (_logger.isDebugEnabled()) { - throw new AMQException("Received content header without previously receiving a BasicPublish frame"); + _logger.debug("Content header received on channel " + _channelId); } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content header received on channel " + _channelId); - } - _currentMessage.setContentHeaderBody(contentHeaderBody); + _currentMessage.setContentHeaderBody(contentHeaderBody); - deliverCurrentMessageIfComplete(); - } + deliverCurrentMessageIfComplete(); } private void deliverCurrentMessageIfComplete() - throws AMQException { // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) @@ -497,7 +488,7 @@ public class AMQChannel * @throws AMQConnectionException if the message is mandatory close-on-no-route * @see AMQProtocolEngine#isCloseWhenNoRoute() */ - private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException + private void handleUnroutableMessage(AMQMessage message) { boolean mandatory = message.isMandatory(); String description = currentMessageDescription(); @@ -512,26 +503,27 @@ public class AMQChannel if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute()) { - throw new AMQConnectionException( - AMQConstant.NO_ROUTE, - "No route for message " + currentMessageDescription(), - 0, 0, // default class and method ids - getConnection().getMethodRegistry(), - (Throwable) null); - } - - if (mandatory || message.isImmediate()) - { - _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message)); + _connection.closeConnection(AMQConstant.NO_ROUTE, + "No route for message " + currentMessageDescription(), _channelId); } else { - AMQShortString exchangeName = _currentMessage.getExchangeName(); - AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); + if (mandatory || message.isImmediate()) + { + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, + "No Route for message " + + currentMessageDescription(), + message)); + } + else + { + AMQShortString exchangeName = _currentMessage.getExchangeName(); + AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); - getVirtualHost().getEventLogger().message( - ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), - routingKey == null ? null : routingKey.asString())); + getVirtualHost().getEventLogger().message( + ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), + routingKey == null ? null : routingKey.asString())); + } } } @@ -550,13 +542,8 @@ public class AMQChannel : _currentMessage.getMessagePublishInfo().getRoutingKey().toString()); } - public void publishContentBody(ContentBody contentBody) throws AMQException + public void publishContentBody(ContentBody contentBody) { - if (_currentMessage == null) - { - throw new AMQException("Received content body without previously receiving a Content Header"); - } - if (_logger.isDebugEnabled()) { _logger.debug(debugIdentity() + " content body received on channel " + _channelId); @@ -568,13 +555,6 @@ public class AMQChannel deliverCurrentMessageIfComplete(); } - catch (AMQException e) - { - // we want to make sure we don't keep a reference to the message in the - // event of an error - _currentMessage = null; - throw e; - } catch (RuntimeException e) { // we want to make sure we don't keep a reference to the message in the @@ -1277,14 +1257,10 @@ public class AMQChannel private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle) - throws AMQException { AMQMessage message = new AMQMessage(handle, _connection.getReference()); - final BasicContentHeaderProperties properties = - incomingMessage.getContentHeader().getProperties(); - return message; } @@ -1340,6 +1316,11 @@ public class AMQChannel return _subject; } + public boolean hasCurrentMessage() + { + return _currentMessage != null; + } + private class GetDeliveryMethod implements ClientDeliveryMethod { @@ -2242,7 +2223,10 @@ public class AMQChannel } @Override - public void receiveChannelClose() + public void receiveChannelClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) { sync(); _connection.closeChannel(this); @@ -2258,6 +2242,43 @@ public class AMQChannel } @Override + public void receiveMessageContent(final byte[] data) + { + + if(hasCurrentMessage()) + { + publishContentBody(new ContentBody(data)); + } + else + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame", + _channelId); + } + } + + @Override + public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize) + { + if(hasCurrentMessage()) + { + publishContentHeader(new ContentHeaderBody(properties, bodySize)); + } + else + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame", + _channelId); + } + } + + @Override + public boolean ignoreAllButCloseOk() + { + return _connection.ignoreAllButCloseOk() || _connection.channelAwaitingClosure(_channelId); + } + + @Override public void receiveChannelFlow(final boolean active) { sync(); @@ -2270,9 +2291,15 @@ public class AMQChannel } @Override + public void receiveChannelFlowOk(final boolean active) + { + // TODO - should we do anything here? + } + + @Override public void receiveExchangeBound(final AMQShortString exchangeName, - final AMQShortString queueName, - final AMQShortString routingKey) + final AMQShortString routingKey, + final AMQShortString queueName) { VirtualHostImpl virtualHost = _connection.getVirtualHost(); MethodRegistry methodRegistry = _connection.getMethodRegistry(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index dce0c3128e..3a4e780db6 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.protocol.v0_8; import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -28,8 +31,6 @@ import java.security.AccessControlException; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -49,7 +50,6 @@ import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; -import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQDecoder; @@ -58,8 +58,6 @@ import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; @@ -93,8 +91,7 @@ import org.apache.qpid.util.BytesDataOutput; public class AMQProtocolEngine implements ServerProtocolEngine, AMQConnectionModel<AMQProtocolEngine, AMQChannel>, - AMQVersionAwareProtocolSession, - ConnectionMethodProcessor + ServerMethodProcessor<ServerChannelMethodProcessor> { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -114,9 +111,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private VirtualHostImpl<?,?,?> _virtualHost; private final Map<Integer, AMQChannel> _channelMap = - new HashMap<Integer, AMQChannel>(); + new HashMap<>(); private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = - new CopyOnWriteArrayList<SessionModelListener>(); + new CopyOnWriteArrayList<>(); private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; @@ -128,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, * Thread-safety: guarded by {@link #_receivedLock}. */ private final Set<AMQChannel> _channelsForCurrentMessage = - new HashSet<AMQChannel>(); + new HashSet<>(); private AMQDecoder _decoder; @@ -142,14 +139,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); - private final FrameCreatingMethodProcessor _methodProcessor = new FrameCreatingMethodProcessor(_protocolVersion); private final List<Action<? super AMQProtocolEngine>> _taskList = - new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>(); + new CopyOnWriteArrayList<>(); - private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); + private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>(); private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); - private MethodDispatcher _dispatcher; private final long _connectionID; private Object _reference = new Object(); @@ -183,6 +178,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private boolean _authenticated; private boolean _compressionSupported; private int _messageCompressionThreshold; + private int _currentClassId; + private int _currentMethodId; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -195,7 +192,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); - _decoder = new AMQDecoder(true, _methodProcessor); + _decoder = new BrokerDecoder(this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -306,32 +303,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _readBytes += msg.remaining(); _receivedLock.lock(); - List<AMQDataBlock> processedMethods = _methodProcessor.getProcessedMethods(); try { _decoder.decodeBuffer(msg); - for (AMQDataBlock dataBlock : processedMethods) - { - try - { - dataBlockReceived(dataBlock); - } - catch(AMQConnectionException e) - { - if(_logger.isDebugEnabled()) - { - _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e); - } - break; - } - catch (AMQException e) - { - _logger.error("Unexpected exception when processing datablock", e); - closeProtocolSession(); - break; - } - } - processedMethods.clear(); receivedComplete(); } catch (ConnectionScopedRuntimeException e) @@ -361,7 +335,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } finally { - processedMethods.clear(); _receivedLock.unlock(); } return null; @@ -399,112 +372,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - /** - * Process the data block. - * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}. - * - * @throws AMQConnectionException if unable to process the data block. In this case, - * the connection is already closed by the time the exception is thrown. If any other - * type of exception is thrown, the connection is not already closed. - */ - private void dataBlockReceived(AMQDataBlock message) throws AMQException - { - if (message instanceof ProtocolInitiation) - { - protocolInitiationReceived((ProtocolInitiation) message); - - } - else if (message instanceof AMQFrame) - { - AMQFrame frame = (AMQFrame) message; - frameReceived(frame); - } - else - { - throw new AMQException("Unknown message type: " + message.getClass().getName() + ": " + message); - } - } - - /** - * Handle the supplied frame. - * Adds this frame's channel to {@link #_channelsForCurrentMessage}. - * - * @throws AMQConnectionException if unable to process the data block. In this case, - * the connection is already closed by the time the exception is thrown. If any other - * type of exception is thrown, the connection is not already closed. - */ - private void frameReceived(AMQFrame frame) throws AMQException + void channelRequiresSync(final AMQChannel amqChannel) { - int channelId = frame.getChannel(); - AMQChannel amqChannel = _channelMap.get(channelId); - if(amqChannel != null) - { - // The _receivedLock is already acquired in the caller - // It is safe to add channel - _channelsForCurrentMessage.add(amqChannel); - } - else - { - // Not an error. The frame is probably a channel Open for this channel id, which - // does not require asynchronous work therefore its absence from - // _channelsForCurrentMessage is ok. - } - - AMQBody body = frame.getBodyFrame(); - - long startTime = 0; - String frameToString = null; - if (_logger.isDebugEnabled()) - { - startTime = System.currentTimeMillis(); - frameToString = frame.toString(); - _logger.debug("RECV: " + frame); - } - - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else - { - // The channel has been told to close, we don't process any more frames until - // it's closed. - return; - } - } - - try - { - body.handle(channelId, this); - } - catch(AMQConnectionException e) - { - _logger.info(e.getMessage() + " whilst processing frame: " + body); - closeConnection(channelId, e); - throw e; - } - catch (AMQException e) - { - closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); - throw e; - } - catch (TransportException e) - { - closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage()); - throw e; - } - - if(_logger.isDebugEnabled()) - { - _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString); - } + _channelsForCurrentMessage.add(amqChannel); } private synchronized void protocolInitiationReceived(ProtocolInitiation pi) @@ -623,148 +494,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return buf; } - public void methodFrameReceived(int channelId, AMQMethodBody methodBody) - { - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); - - try - { - try - { - boolean wasAnyoneInterested = methodReceived(evt); - - if (!wasAnyoneInterested) - { - throw new AMQNoMethodHandlerException(evt); - } - } - catch (AMQChannelException e) - { - if (getChannel(channelId) != null) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing channel due to: " + e.getMessage()); - } - - AMQConstant errorType = e.getErrorCode(); - if(errorType == null) - { - errorType = AMQConstant.INTERNAL_ERROR; - } - writeFrame(new AMQFrame(channelId, - getMethodRegistry().createChannelCloseBody(errorType.getCode(), - AMQShortString.validValueOf(e.getMessage()), - e.getClassId(), - e.getMethodId()))); - closeChannel(channelId, errorType, e.getMessage()); - } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("ChannelException occurred on non-existent channel:" + e.getMessage()); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e.getMessage()); - } - - AMQConnectionException ce = new AMQConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString(), - methodBody, getMethodRegistry()); - - _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce); - } - } - catch (AMQConnectionException e) - { - _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, e); - } - } - catch (Exception e) - { - _logger.error("Unexpected exception while processing frame. Closing connection.", e); - - closeProtocolSession(); - } - } - private <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException - { - final MethodDispatcher dispatcher = getMethodDispatcher(); - - final int channelId = evt.getChannelId(); - final B body = evt.getMethod(); - - final AMQChannel channel = getChannel(channelId); - if(channelId != 0 && channel == null) - { - - if(! ((body instanceof ChannelOpenBody) - || (body instanceof ChannelCloseOkBody) - || (body instanceof ChannelCloseBody))) - { - throw new AMQConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body, body, getMethodRegistry()); - } - - } - if(channel == null) - { - return body.execute(dispatcher, channelId); - } - else - { - try - { - return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>() - { - @Override - public Boolean run() throws AMQException - { - return body.execute(dispatcher, channelId); - } - }); - } - catch (PrivilegedActionException e) - { - if(e.getCause() instanceof AMQException) - { - throw (AMQException) e.getCause(); - } - else - { - throw new ServerScopedRuntimeException(e.getCause()); - } - } - - - } - - } - - public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException - { - - AMQChannel channel = getAndAssertChannel(channelId); - - channel.publishContentHeader(body); - - } - - public void contentBodyReceived(int channelId, ContentBody body) throws AMQException - { - AMQChannel channel = getAndAssertChannel(channelId); - - channel.publishContentBody(body); - } - - public void heartbeatBodyReceived(int channelId, HeartbeatBody body) - { - // NO - OP - } /** * Convenience method that writes a frame to the protocol session. Equivalent to calling @@ -808,19 +537,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { synchronized (_channelMap) { - return new ArrayList<AMQChannel>(_channelMap.values()); - } - } - - public AMQChannel getAndAssertChannel(int channelId) throws AMQException - { - AMQChannel channel = getChannel(channelId); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); + return new ArrayList<>(_channelMap.values()); } - - return channel; } public AMQChannel getChannel(int channelId) @@ -899,8 +617,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, writeFrame(new AMQFrame(channel.getChannelId(), getMethodRegistry().createChannelCloseBody(cause.getCode(), AMQShortString.validValueOf(message), - _methodProcessor.getClassId(), - _methodProcessor.getMethodId()))); + _currentClassId, + _currentMethodId))); closeChannel(channel, cause, message, true); } @@ -1106,7 +824,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { _logger.info("Closing connection due to: " + message); } - closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _methodProcessor.getClassId(), _methodProcessor.getMethodId()))); + closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId))); } private void closeConnection(int channelId, AMQFrame frame) @@ -1224,9 +942,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { _protocolVersion = pv; _methodRegistry.setProtocolVersion(_protocolVersion); - _methodProcessor.setProtocolVersion(_protocolVersion); _protocolOutputConverter = new ProtocolOutputConverterImpl(this); - _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this); } public byte getProtocolMajorVersion() @@ -1335,11 +1051,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _methodRegistry; } - public MethodDispatcher getMethodDispatcher() - { - return _dispatcher; - } - public void closed() { try @@ -1353,14 +1064,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, closeProtocolSession(); } } - catch (ConnectionScopedRuntimeException e) + catch (ConnectionScopedRuntimeException | TransportException e) { _logger.error("Could not close protocol engine", e); } - catch (TransportException e) - { - _logger.error("Could not close protocol engine", e); - } } public void readerIdle() @@ -1427,11 +1134,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - public void setSender(Sender<ByteBuffer> sender) - { - // Do nothing - } - public long getReadBytes() { return _readBytes; @@ -1572,7 +1274,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public List<AMQChannel> getSessionModels() { - return new ArrayList<AMQChannel>(getChannels()); + return new ArrayList<>(getChannels()); } public LogSubject getLogSubject() @@ -2074,4 +1776,52 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _broker.getEventLogger(); } } + + @Override + public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId) + { + ServerChannelMethodProcessor channelMethodProcessor = getChannel(channelId); + if(channelMethodProcessor == null) + { + channelMethodProcessor = (ServerChannelMethodProcessor) Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(), + new Class[] { ServerChannelMethodProcessor.class }, new InvocationHandler() + { + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) + throws Throwable + { + closeConnection(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId, channelId); + + return null; + } + }); + } + return channelMethodProcessor; + } + + @Override + public void receiveHeartbeat() + { + // No op + } + + @Override + public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) + { + protocolInitiationReceived(protocolInitiation); + } + + @Override + public void setCurrentMethod(final int classId, final int methodId) + { + _currentClassId = classId; + _currentMethodId = methodId; + } + + @Override + public boolean ignoreAllButCloseOk() + { + return _closing.get(); + } + } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java new file mode 100644 index 0000000000..5a4466b003 --- /dev/null +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import java.io.IOException; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; + +import javax.security.auth.Subject; + +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.codec.ServerDecoder; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.server.util.ServerScopedRuntimeException; + +public class BrokerDecoder extends ServerDecoder +{ + private final AMQProtocolEngine _connection; + /** + * Creates a new AMQP decoder. + * + * @param connection + */ + public BrokerDecoder(final AMQProtocolEngine connection) + { + super(connection); + _connection = connection; + } + + @Override + protected void processFrame(final int channelId, final byte type, final long bodySize, final MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + Subject subject; + AMQChannel channel = _connection.getChannel(channelId); + if(channel == null) + { + subject = _connection.getSubject(); + } + else + { + _connection.channelRequiresSync(channel); + + subject = channel.getSubject(); + } + try + { + Subject.doAs(subject, new PrivilegedExceptionAction<Object>() + { + @Override + public Void run() throws IOException, AMQFrameDecodingException + { + doProcessFrame(channelId, type, bodySize, in); + return null; + } + }); + } + catch (PrivilegedActionException e) + { + Throwable cause = e.getCause(); + if(cause instanceof IOException) + { + throw (IOException) cause; + } + else if(cause instanceof AMQFrameDecodingException) + { + throw (AMQFrameDecodingException) cause; + } + else if(cause instanceof RuntimeException) + { + throw (RuntimeException) cause; + } + else throw new ServerScopedRuntimeException(cause); + } + + } + + + private void doProcessFrame(final int channelId, final byte type, final long bodySize, final MarkableDataInput in) + throws AMQFrameDecodingException, IOException + { + super.processFrame(channelId, type, bodySize, in); + + } + +} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java deleted file mode 100644 index d4c7f151e7..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public interface ChannelMethodProcessor -{ - void receiveAccessRequest(AMQShortString realm, - boolean exclusive, - boolean passive, - boolean active, - boolean write, - boolean read); - - void receiveBasicAck(long deliveryTag, boolean multiple); - - void receiveBasicCancel(AMQShortString consumerTag, boolean nowait); - - void receiveBasicConsume(AMQShortString queue, - AMQShortString consumerTag, - boolean noLocal, - boolean noAck, - boolean exclusive, - boolean nowait, - FieldTable arguments); - - void receiveBasicGet(AMQShortString queue, boolean noAck); - - void receiveBasicPublish(AMQShortString exchange, - AMQShortString routingKey, - boolean mandatory, - boolean immediate); - - void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global); - - void receiveBasicRecover(boolean requeue, boolean sync); - - void receiveBasicReject(long deliveryTag, boolean requeue); - - void receiveChannelClose(); - - void receiveChannelCloseOk(); - - void receiveChannelFlow(boolean active); - - void receiveExchangeBound(AMQShortString exchange, AMQShortString queue, AMQShortString routingKey); - - void receiveExchangeDeclare(AMQShortString exchange, - AMQShortString type, - boolean passive, - boolean durable, - boolean autoDelete, - boolean internal, - boolean nowait, - FieldTable arguments); - - void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait); - - void receiveQueueBind(AMQShortString queue, - AMQShortString exchange, - AMQShortString routingKey, - boolean nowait, - FieldTable arguments); - - void receiveQueueDeclare(AMQShortString queueStr, - boolean passive, - boolean durable, - boolean exclusive, - boolean autoDelete, - boolean nowait, - FieldTable arguments); - - void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); - - void receiveQueuePurge(AMQShortString queue, boolean nowait); - - void receiveQueueUnbind(AMQShortString queue, - AMQShortString exchange, - AMQShortString routingKey, - FieldTable arguments); - - void receiveTxSelect(); - - void receiveTxCommit(); - - void receiveTxRollback(); - - -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java deleted file mode 100644 index 6e657c022e..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public interface ConnectionMethodProcessor -{ - void receiveChannelOpen(int channelId); - - void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); - - void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); - - void receiveConnectionCloseOk(); - - void receiveConnectionSecureOk(byte[] response); - - void receiveConnectionStartOk(FieldTable clientProperties, - AMQShortString mechanism, - byte[] response, - AMQShortString locale); - - void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java index 821d6101db..d966e9c9c6 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java @@ -20,16 +20,15 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; +import java.util.ArrayList; +import java.util.List; + import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.message.MessageDestination; -import java.util.ArrayList; -import java.util.List; - public class IncomingMessage { @@ -58,7 +57,7 @@ public class IncomingMessage return _messagePublishInfo; } - public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException + public void addContentBodyFrame(final ContentBody contentChunk) { _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); @@ -94,7 +93,7 @@ public class IncomingMessage _messageDestination = e; } - public int getBodyCount() throws AMQException + public int getBodyCount() { return _contentChunks.size(); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java deleted file mode 100644 index b3ee5f9ff9..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java +++ /dev/null @@ -1,826 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import java.security.PrivilegedAction; - -import javax.security.auth.Subject; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.Broker; - -public class ServerMethodDispatcherImpl implements MethodDispatcher -{ - private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class); - - private final AMQProtocolEngine _connection; - - - private static interface ChannelAction - { - void onChannel(ChannelMethodProcessor channel); - } - - - private static interface ConnectionAction - { - void onConnection(ConnectionMethodProcessor connection); - } - - - public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection) - { - return new ServerMethodDispatcherImpl(connection); - } - - - public ServerMethodDispatcherImpl(AMQProtocolEngine connection) - { - _connection = connection; - } - - - protected final AMQProtocolEngine getConnection() - { - return _connection; - } - - private void processChannelMethod(int channelId, final ChannelAction action) - { - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - closeConnection(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId); - } - else - { - Subject.doAs(channel.getSubject(), new PrivilegedAction<Void>() - { - @Override - public Void run() - { - action.onChannel(channel); - return null; - } - }); - } - - } - - private void processConnectionMethod(final ConnectionAction action) - { - Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>() - { - @Override - public Void run() - { - action.onConnection(_connection); - return null; - } - }); - - - } - - public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveAccessRequest(body.getRealm(), - body.getExclusive(), - body.getPassive(), - body.getActive(), - body.getWrite(), - body.getRead()); - } - } - ); - - return true; - } - - public boolean dispatchBasicAck(final BasicAckBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicAck(body.getDeliveryTag(), body.getMultiple()); - } - } - ); - - return true; - } - - public boolean dispatchBasicCancel(final BasicCancelBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicCancel(body.getConsumerTag(), - body.getNowait() - ); - } - } - ); - return true; - } - - public boolean dispatchBasicConsume(final BasicConsumeBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicConsume(body.getQueue(), body.getConsumerTag(), - body.getNoLocal(), body.getNoAck(), - body.getExclusive(), body.getNowait(), - body.getArguments()); - } - } - ); - - - return true; - } - - private void closeConnection(final AMQConstant constant, - final String message) - { - _connection.closeConnection(constant, message, 0); - } - - public boolean dispatchBasicGet(final BasicGetBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicGet(body.getQueue(), body.getNoAck()); - } - } - ); - return true; - } - - public boolean dispatchBasicPublish(final BasicPublishBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicPublish(body.getExchange(), body.getRoutingKey(), - body.getMandatory(), body.getImmediate()); - } - } - ); - - return true; - } - - public boolean dispatchBasicQos(final BasicQosBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicQos(body.getPrefetchSize(), body.getPrefetchCount(), - body.getGlobal()); - } - } - ); - - return true; - } - - public boolean dispatchBasicRecover(final BasicRecoverBody body, int channelId) - { - final boolean sync = _connection.getProtocolVersion().equals(ProtocolVersion.v8_0); - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicRecover(body.getRequeue(), sync); - } - } - ); - - return true; - } - - public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicReject(body.getDeliveryTag(), body.getRequeue()); - } - } - ); - - return true; - } - - public boolean dispatchChannelOpen(ChannelOpenBody body, final int channelId) - { - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveChannelOpen(channelId); - } - }); - return true; - } - - - public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveChannelClose(); - } - } - ); - - return true; - } - - - public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveChannelCloseOk(); - } - } - ); - - return true; - } - - - public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveChannelFlow(body.getActive()); - } - } - ); - return true; - } - - public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchConnectionOpen(final ConnectionOpenBody body, int channelId) - { - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionOpen(body.getVirtualHost(), body.getCapabilities(), body.getInsist()); - } - }); - - return true; - } - - - public boolean dispatchConnectionClose(final ConnectionCloseBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionClose(body.getReplyCode(), - body.getReplyText(), - body.getClassId(), - body.getMethodId()); - } - }); - - return true; - } - - - public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionCloseOk(); - } - }); - - return true; - } - - public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchConnectionSecureOk(final ConnectionSecureOkBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionSecureOk(body.getResponse()); - } - }); - - return true; - } - - private void disposeSaslServer(AMQProtocolEngine connection) - { - SaslServer ss = connection.getSaslServer(); - if (ss != null) - { - connection.setSaslServer(null); - try - { - ss.dispose(); - } - catch (SaslException e) - { - _logger.error("Error disposing of Sasl server: " + e); - } - } - } - - public boolean dispatchConnectionStartOk(final ConnectionStartOkBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionStartOk(body.getClientProperties(), - body.getMechanism(), - body.getResponse(), - body.getLocale()); - } - }); - - return true; - } - - public boolean dispatchConnectionTuneOk(final ConnectionTuneOkBody body, int channelId) - { - - processConnectionMethod(new ConnectionAction() - { - @Override - public void onConnection(final ConnectionMethodProcessor connection) - { - connection.receiveConnectionTuneOk(body.getChannelMax(), - body.getFrameMax(), - body.getHeartbeat()); - } - }); - final AMQProtocolEngine connection = getConnection(); - - - return true; - } - - public boolean dispatchExchangeBound(final ExchangeBoundBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveExchangeBound(body.getExchange(), body.getQueue(), body.getRoutingKey()); - } - } - ); - - return true; - } - - public boolean dispatchExchangeDeclare(final ExchangeDeclareBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveExchangeDeclare(body.getExchange(), body.getType(), - body.getPassive(), - body.getDurable(), - body.getAutoDelete(), - body.getInternal(), - body.getNowait(), - body.getArguments()); - } - } - ); - - return true; - } - - public boolean dispatchExchangeDelete(final ExchangeDeleteBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveExchangeDelete(body.getExchange(), - body.getIfUnused(), - body.getNowait()); - } - } - ); - - return true; - } - - public boolean dispatchQueueBind(final QueueBindBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueueBind(body.getQueue(), - body.getExchange(), - body.getRoutingKey(), - body.getNowait(), - body.getArguments()); - } - } - ); - - return true; - } - - public boolean dispatchQueueDeclare(final QueueDeclareBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueueDeclare(body.getQueue(), - body.getPassive(), - body.getDurable(), - body.getExclusive(), - body.getAutoDelete(), - body.getNowait(), - body.getArguments()); - } - } - ); - - return true; - } - - public boolean dispatchQueueDelete(final QueueDeleteBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueueDelete(body.getQueue(), - body.getIfUnused(), - body.getIfEmpty(), - body.getNowait()); - } - } - ); - - return true; - } - - public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueuePurge(body.getQueue(), - body.getNowait()); - } - } - ); - - return true; - } - - - public boolean dispatchTxCommit(TxCommitBody body, final int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveTxCommit(); - } - } - ); - - return true; - } - - public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveTxRollback(); - } - } - ); - return true; - } - - public boolean dispatchTxSelect(TxSelectBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveTxSelect(); - } - } - ); - return true; - } - - public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) - { - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveBasicRecover(body.getRequeue(), true); - } - } - ); - - return true; - } - - public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - @Override - public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId) - throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) - { - - processChannelMethod(channelId, - new ChannelAction() - { - @Override - public void onChannel(final ChannelMethodProcessor channel) - { - channel.receiveQueueUnbind(body.getQueue(), - body.getExchange(), - body.getRoutingKey(), - body.getArguments()); - } - } - ); - - return true; - } - -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java deleted file mode 100644 index 625836bcf2..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java +++ /dev/null @@ -1,964 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import java.security.PrivilegedAction; - -import javax.security.auth.Subject; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.log4j.Logger; - -import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; - -public class ServerMethodProcessor implements MethodProcessor -{ - private static final Logger LOGGER = Logger.getLogger(ServerMethodProcessor.class); - private int _classId; - private int _methodId; - - - private static interface ChannelAction - { - void onChannel(ChannelMethodProcessor channel); - } - - private ProtocolVersion _protocolVersion; - private ServerMethodDispatcherImpl _dispatcher; - private AMQProtocolEngine _connection; - - public ServerMethodProcessor(final ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - } - - - private void processChannelMethod(int channelId, final ChannelAction action) - { - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - // TODO throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - else - { - Subject.doAs(channel.getSubject(), new PrivilegedAction<Void>() - { - @Override - public Void run() - { - action.onChannel(channel); - return null; - } - }); - } - - } - - @Override - public void receiveConnectionStart(final short versionMajor, - final short versionMinor, - final FieldTable serverProperties, - final byte[] mechanisms, - final byte[] locales) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, - new ConnectionStartBody(versionMajor, - versionMinor, - serverProperties, - mechanisms, - locales)); - } - _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unexpected method received: ConnectionStart", 0 - ); - - } - - @Override - public void receiveConnectionStartOk(final FieldTable clientProperties, - final AMQShortString mechanism, - final byte[] response, - final AMQShortString locale) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale)); - } - - Broker<?> broker = _connection.getBroker(); - - SubjectCreator subjectCreator = _connection.getSubjectCreator(); - SaslServer ss = null; - try - { - ss = subjectCreator.createSaslServer(String.valueOf(mechanism), - _connection.getLocalFQDN(), - _connection.getPeerPrincipal()); - - if (ss == null) - { - _connection.closeConnection(AMQConstant.RESOURCE_ERROR, - "Unable to create SASL Server:" + mechanism, 0 - ); - } - else - { - _connection.setSaslServer(ss); - - final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response); - //save clientProperties - _connection.setClientProperties(clientProperties); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); - - LOGGER.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - _connection.closeConnection(AMQConstant.NOT_ALLOWED, - AMQConstant.NOT_ALLOWED.getName().toString(), 0 - ); - - disposeSaslServer(); - break; - - case SUCCESS: - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Connected as: " + authResult.getSubject()); - } - _connection.setAuthorizedSubject(authResult.getSubject()); - - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if (frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } - - ConnectionTuneBody - tuneBody = - methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - break; - case CONTINUE: - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); - } - } - } - catch (SaslException e) - { - disposeSaslServer(); - - _connection.closeConnection(AMQConstant.RESOURCE_ERROR, "SASL error: " + e.getMessage(), 0 - ); - } - - } - - @Override - public void receiveTxSelect(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxSelectBody.INSTANCE); - } - - } - - @Override - public void receiveTxSelectOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxSelectOkBody.INSTANCE); - } - - } - - @Override - public void receiveTxCommit(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxCommitBody.INSTANCE); - } - - } - - @Override - public void receiveTxCommitOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxCommitOkBody.INSTANCE); - } - - } - - @Override - public void receiveTxRollback(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxRollbackBody.INSTANCE); - } - - } - - @Override - public void receiveTxRollbackOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, TxRollbackOkBody.INSTANCE); - } - - } - - @Override - public void receiveConnectionSecure(final byte[] challenge) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionSecureBody(challenge)); - } - - } - - @Override - public void receiveConnectionSecureOk(final byte[] response) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionSecureOkBody(response)); - } - - } - - @Override - public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat)); - } - - } - - @Override - public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat)); - } - - } - - @Override - public void receiveConnectionOpen(final AMQShortString virtualHost, - final AMQShortString capabilities, - final boolean insist) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist)); - } - - } - - @Override - public void receiveConnectionOpenOk(final AMQShortString knownHosts) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionOpenOkBody(knownHosts)); - } - - } - - @Override - public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts)); - } - - } - - @Override - public void receiveConnectionClose(final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, - new ConnectionCloseBody(getProtocolVersion(), - replyCode, - replyText, - classId, - methodId)); - } - - } - - @Override - public void receiveConnectionCloseOk() - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion()) - ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8 - : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9); - } - } - - @Override - public void receiveChannelOpen(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelOpenBody()); - } - - } - - @Override - public void receiveChannelOpenOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) - ? ChannelOpenOkBody.INSTANCE_0_8 - : ChannelOpenOkBody.INSTANCE_0_9); - } - } - - @Override - public void receiveChannelFlow(final int channelId, final boolean active) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelFlowBody(active)); - } - - } - - @Override - public void receiveChannelFlowOk(final int channelId, final boolean active) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelFlowOkBody(active)); - } - - } - - @Override - public void receiveChannelAlert(final int channelId, - final int replyCode, - final AMQShortString replyText, - final FieldTable details) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)); - } - - } - - @Override - public void receiveChannelClose(final int channelId, - final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)); - } - - } - - @Override - public void receiveChannelCloseOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE); - } - - } - - @Override - public void receiveAccessRequest(final int channelId, - final AMQShortString realm, - final boolean exclusive, - final boolean passive, - final boolean active, - final boolean write, - final boolean read) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = - new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)); - } - - } - - @Override - public void receiveAccessRequestOk(final int channelId, final int ticket) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new AccessRequestOkBody(ticket)); - } - - } - - @Override - public void receiveExchangeDeclare(final int channelId, - final AMQShortString exchange, - final AMQShortString type, - final boolean passive, - final boolean durable, - final boolean autoDelete, - final boolean internal, - final boolean nowait, final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new ExchangeDeclareBody(0, - exchange, - type, - passive, - durable, - autoDelete, - internal, - nowait, - arguments)); - } - - } - - @Override - public void receiveExchangeDeclareOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeDeclareOkBody()); - } - - } - - @Override - public void receiveExchangeDelete(final int channelId, - final AMQShortString exchange, - final boolean ifUnused, - final boolean nowait) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)); - } - - } - - @Override - public void receiveExchangeDeleteOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeDeleteOkBody()); - } - - } - - @Override - public void receiveExchangeBound(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final AMQShortString queue) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)); - } - - } - - @Override - public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)); - } - - } - - @Override - public void receiveQueueBindOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueBindOkBody()); - } - - } - - @Override - public void receiveQueueUnbindOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueUnbindOkBody()); - } - - } - - @Override - public void receiveQueueDeclare(final int channelId, - final AMQShortString queue, - final boolean passive, - final boolean durable, - final boolean exclusive, - final boolean autoDelete, - final boolean nowait, - final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new QueueDeclareBody(0, - queue, - passive, - durable, - exclusive, - autoDelete, - nowait, - arguments)); - } - - } - - @Override - public void receiveQueueDeclareOk(final int channelId, - final AMQShortString queue, - final long messageCount, - final long consumerCount) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)); - } - - } - - @Override - public void receiveQueueBind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final boolean nowait, - final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = - new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)); - } - - } - - @Override - public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)); - } - - } - - @Override - public void receiveQueuePurgeOk(final int channelId, final long messageCount) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)); - } - - } - - @Override - public void receiveQueueDelete(final int channelId, - final AMQShortString queue, - final boolean ifUnused, - final boolean ifEmpty, - final boolean nowait) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)); - } - - } - - @Override - public void receiveQueueDeleteOk(final int channelId, final long messageCount) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)); - } - - } - - @Override - public void receiveQueueUnbind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)); - } - - } - - @Override - public void receiveBasicRecoverSyncOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())); - } - - } - - @Override - public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync) - { - if (ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicRecoverBody(requeue)); - } - - } - else - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)); - } - - } - } - - @Override - public void receiveBasicQos(final int channelId, - final long prefetchSize, - final int prefetchCount, - final boolean global) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)); - } - - } - - @Override - public void receiveBasicQosOk(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicQosOkBody()); - } - - } - - @Override - public void receiveBasicConsume(final int channelId, - final AMQShortString queue, - final AMQShortString consumerTag, - final boolean noLocal, - final boolean noAck, - final boolean exclusive, - final boolean nowait, - final FieldTable arguments) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new BasicConsumeBody(0, - queue, - consumerTag, - noLocal, - noAck, - exclusive, - nowait, - arguments)); - } - - } - - @Override - public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)); - } - - } - - @Override - public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)); - } - - } - - @Override - public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)); - } - - } - - @Override - public void receiveBasicPublish(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final boolean mandatory, - final boolean immediate) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = - new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)); - } - - } - - @Override - public void receiveBasicReturn(final int channelId, final int replyCode, - final AMQShortString replyText, - final AMQShortString exchange, - final AMQShortString routingKey) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)); - } - - } - - @Override - public void receiveBasicDeliver(final int channelId, - final AMQShortString consumerTag, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new BasicDeliverBody(consumerTag, - deliveryTag, - redelivered, - exchange, - routingKey)); - } - - } - - @Override - public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)); - } - - } - - @Override - public void receiveBasicGetOk(final int channelId, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey, - final long messageCount) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, - new BasicGetOkBody(deliveryTag, - redelivered, - exchange, - routingKey, - messageCount)); - } - - } - - @Override - public void receiveBasicGetEmpty(final int channelId) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString) null)); - } - - } - - @Override - public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)); - } - - } - - @Override - public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)); - } - - } - - @Override - public void receiveHeartbeat() - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(0, new HeartbeatBody()); - } - - } - - @Override - public ProtocolVersion getProtocolVersion() - { - return _protocolVersion; - } - - public void setProtocolVersion(final ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - } - - @Override - public void receiveMessageContent(final int channelId, final byte[] data) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ContentBody(data)); - } - - } - - @Override - public void receiveMessageHeader(final int channelId, - final BasicContentHeaderProperties properties, - final long bodySize) - { - if (LOGGER.isDebugEnabled()) - { - AMQFrame frame = new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)); - } - - } - - @Override - public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) - { - if (LOGGER.isDebugEnabled()) - { - AMQDataBlock frame = protocolInitiation; - } - - } - - @Override - public void setCurrentMethod(final int classId, final int methodId) - { - _classId = classId; - _methodId = methodId; - } - - private void disposeSaslServer() - { - SaslServer ss = _connection.getSaslServer(); - if (ss != null) - { - _connection.setSaslServer(null); - try - { - ss.dispose(); - } - catch (SaslException e) - { - LOGGER.error("Error disposing of Sasl server: " + e); - } - } - } -} |