diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java | 140 |
1 files changed, 56 insertions, 84 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index e83e86981b..5e95701e5a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -46,28 +46,11 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQProtocolHeaderException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodDispatcher; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -95,6 +78,8 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -303,9 +288,24 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } receivedComplete(); } - catch (Exception e) + catch (ConnectionScopedRuntimeException e) + { + _logger.error("Unexpected exception", e); + closeProtocolSession(); + } + catch (AMQProtocolVersionException e) + { + _logger.error("Unexpected protocol version", e); + closeProtocolSession(); + } + catch (AMQFrameDecodingException e) { - _logger.error("Unexpected exception when processing datablocks", e); + _logger.error("Frame decoding", e); + closeProtocolSession(); + } + catch (IOException e) + { + _logger.error("I/O Exception", e); closeProtocolSession(); } finally @@ -314,34 +314,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - private void receivedComplete() throws AMQException + private void receivedComplete() { - Exception exception = null; for (AMQChannel channel : _channelsForCurrentMessage) { - try - { - channel.receivedComplete(); - } - catch(Exception exceptionForThisChannel) - { - if(exception == null) - { - exception = exceptionForThisChannel; - } - _logger.error("Error informing channel that receiving is complete. Channel: " + channel, exceptionForThisChannel); - } + channel.receivedComplete(); } _channelsForCurrentMessage.clear(); - - if(exception != null) - { - throw new AMQException( - AMQConstant.INTERNAL_ERROR, - "Error informing channel that receiving is complete: " + exception.getMessage(), - exception); - } } /** @@ -549,7 +529,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (IOException e) { - throw new RuntimeException(e); + throw new ServerScopedRuntimeException(e); } final ByteBuffer buf; @@ -628,12 +608,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _logger.info(e.getMessage() + " whilst processing:" + methodBody); closeConnection(channelId, e); } - catch (AMQSecurityException e) - { - AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); - _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce); - } } catch (Exception e) { @@ -818,16 +792,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * * @param channelId id of the channel to close * - * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid */ @Override - public void closeChannel(int channelId) throws AMQException + public void closeChannel(int channelId) { closeChannel(channelId, null, null); } - public void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException + public void closeChannel(int channelId, AMQConstant cause, String message) { final AMQChannel channel = getChannel(channelId); if (channel == null) @@ -909,7 +882,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * * @throws AMQException if an error occurs while closing any channel */ - private void closeAllChannels() throws AMQException + private void closeAllChannels() { for (AMQChannel channel : getChannels()) { @@ -927,7 +900,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /** This must be called when the session is _closed in order to free up any resources managed by the session. */ @Override - public void closeSession() throws AMQException + public void closeSession() { if(_closing.compareAndSet(false,true)) { @@ -1002,7 +975,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - private void closeConnection(int channelId, AMQConnectionException e) throws AMQException + private void closeConnection(int channelId, AMQConnectionException e) { try { @@ -1039,7 +1012,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { _stateManager.changeState(AMQState.CONNECTION_CLOSED); } - catch (AMQException e) + catch (ConnectionScopedRuntimeException e) { _logger.info(e.getMessage()); } @@ -1240,9 +1213,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeProtocolSession(); } } - catch (AMQException e) + catch (ConnectionScopedRuntimeException e) { - _logger.error("Could not close protocol engine", e); + _logger.error("Could not close protocol engine", e); } catch (TransportException e) { @@ -1275,15 +1248,30 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } else { - _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable); + try + { + _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable); - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion()); - ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0); + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion()); + ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0); - writeFrame(closeBody.generateFrame(0)); + writeFrame(closeBody.generateFrame(0)); - _sender.close(); + _sender.close(); + } + finally + { + if(throwable instanceof Error) + { + throw (Error) throwable; + } + if(throwable instanceof ServerScopedRuntimeException) + { + throw (ServerScopedRuntimeException) throwable; + } + + } } } @@ -1447,15 +1435,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { writeFrame(responseBody.generateFrame(0)); - try - { + closeSession(); - closeSession(); - } - catch (AMQException ex) - { - throw new RuntimeException(ex); - } } finally { @@ -1489,15 +1470,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi try { writeFrame(responseBody.generateFrame(channelId)); - - try - { - closeChannel(channelId); - } - catch (AMQException ex) - { - throw new RuntimeException(ex); - } + closeChannel(channelId); } finally { @@ -1513,7 +1486,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return getContextKey().toString(); } - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) { int channelId = ((AMQChannel)session).getChannelId(); closeChannel(channelId, cause, message); @@ -1528,7 +1501,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi writeFrame(responseBody.generateFrame(channelId)); } - public void close(AMQConstant cause, String message) throws AMQException + public void close(AMQConstant cause, String message) { closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getProtocolOutputConverter().getProtocolMajorVersion(), @@ -1670,7 +1643,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi @Override public void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) - throws AMQException { registerMessageDelivered(message.getSize()); _protocolOutputConverter.writeDeliver(message, |