summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java140
1 files changed, 56 insertions, 84 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index e83e86981b..5e95701e5a 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -46,28 +46,11 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -95,6 +78,8 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -303,9 +288,24 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
receivedComplete();
}
- catch (Exception e)
+ catch (ConnectionScopedRuntimeException e)
+ {
+ _logger.error("Unexpected exception", e);
+ closeProtocolSession();
+ }
+ catch (AMQProtocolVersionException e)
+ {
+ _logger.error("Unexpected protocol version", e);
+ closeProtocolSession();
+ }
+ catch (AMQFrameDecodingException e)
{
- _logger.error("Unexpected exception when processing datablocks", e);
+ _logger.error("Frame decoding", e);
+ closeProtocolSession();
+ }
+ catch (IOException e)
+ {
+ _logger.error("I/O Exception", e);
closeProtocolSession();
}
finally
@@ -314,34 +314,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- private void receivedComplete() throws AMQException
+ private void receivedComplete()
{
- Exception exception = null;
for (AMQChannel channel : _channelsForCurrentMessage)
{
- try
- {
- channel.receivedComplete();
- }
- catch(Exception exceptionForThisChannel)
- {
- if(exception == null)
- {
- exception = exceptionForThisChannel;
- }
- _logger.error("Error informing channel that receiving is complete. Channel: " + channel, exceptionForThisChannel);
- }
+ channel.receivedComplete();
}
_channelsForCurrentMessage.clear();
-
- if(exception != null)
- {
- throw new AMQException(
- AMQConstant.INTERNAL_ERROR,
- "Error informing channel that receiving is complete: " + exception.getMessage(),
- exception);
- }
}
/**
@@ -549,7 +529,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new ServerScopedRuntimeException(e);
}
final ByteBuffer buf;
@@ -628,12 +608,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
closeConnection(channelId, e);
}
- catch (AMQSecurityException e)
- {
- AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
- _logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce);
- }
}
catch (Exception e)
{
@@ -818,16 +792,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*
* @param channelId id of the channel to close
*
- * @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
@Override
- public void closeChannel(int channelId) throws AMQException
+ public void closeChannel(int channelId)
{
closeChannel(channelId, null, null);
}
- public void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException
+ public void closeChannel(int channelId, AMQConstant cause, String message)
{
final AMQChannel channel = getChannel(channelId);
if (channel == null)
@@ -909,7 +882,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*
* @throws AMQException if an error occurs while closing any channel
*/
- private void closeAllChannels() throws AMQException
+ private void closeAllChannels()
{
for (AMQChannel channel : getChannels())
{
@@ -927,7 +900,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
@Override
- public void closeSession() throws AMQException
+ public void closeSession()
{
if(_closing.compareAndSet(false,true))
{
@@ -1002,7 +975,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- private void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+ private void closeConnection(int channelId, AMQConnectionException e)
{
try
{
@@ -1039,7 +1012,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
- catch (AMQException e)
+ catch (ConnectionScopedRuntimeException e)
{
_logger.info(e.getMessage());
}
@@ -1240,9 +1213,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeProtocolSession();
}
}
- catch (AMQException e)
+ catch (ConnectionScopedRuntimeException e)
{
- _logger.error("Could not close protocol engine", e);
+ _logger.error("Could not close protocol engine", e);
}
catch (TransportException e)
{
@@ -1275,15 +1248,30 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
else
{
- _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable);
+ try
+ {
+ _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable);
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
- ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
+ ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
- writeFrame(closeBody.generateFrame(0));
+ writeFrame(closeBody.generateFrame(0));
- _sender.close();
+ _sender.close();
+ }
+ finally
+ {
+ if(throwable instanceof Error)
+ {
+ throw (Error) throwable;
+ }
+ if(throwable instanceof ServerScopedRuntimeException)
+ {
+ throw (ServerScopedRuntimeException) throwable;
+ }
+
+ }
}
}
@@ -1447,15 +1435,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
writeFrame(responseBody.generateFrame(0));
- try
- {
+ closeSession();
- closeSession();
- }
- catch (AMQException ex)
- {
- throw new RuntimeException(ex);
- }
}
finally
{
@@ -1489,15 +1470,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
try
{
writeFrame(responseBody.generateFrame(channelId));
-
- try
- {
- closeChannel(channelId);
- }
- catch (AMQException ex)
- {
- throw new RuntimeException(ex);
- }
+ closeChannel(channelId);
}
finally
{
@@ -1513,7 +1486,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return getContextKey().toString();
}
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
{
int channelId = ((AMQChannel)session).getChannelId();
closeChannel(channelId, cause, message);
@@ -1528,7 +1501,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
writeFrame(responseBody.generateFrame(channelId));
}
- public void close(AMQConstant cause, String message) throws AMQException
+ public void close(AMQConstant cause, String message)
{
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
getProtocolOutputConverter().getProtocolMajorVersion(),
@@ -1670,7 +1643,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
@Override
public void deliverToClient(final Consumer sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
- throws AMQException
{
registerMessageDelivered(message.getSize());
_protocolOutputConverter.writeDeliver(message,