diff options
author | Gordon Sim <gsim@apache.org> | 2007-01-25 18:24:48 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-01-25 18:24:48 +0000 |
commit | f90210810795926ce7a6111b1d4dac9b81df78fa (patch) | |
tree | 181fad6ad1c3aa59334b09e94af962432e1e02bf | |
parent | a1bf3981115c397e55cd7fdc195381a10d5f940c (diff) | |
download | qpid-python-f90210810795926ce7a6111b1d4dac9b81df78fa.tar.gz |
Improved channel/connection exception handling.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@499880 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 99 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 1cfc90d8e0..a5dfc6d1e5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -25,6 +25,7 @@ import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQProtocolVersionException; @@ -246,7 +247,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, throw new AMQException("Incoming request frame on connection which is pending close."); AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame; if (!(requestBody.getMethodPayload() instanceof ConnectionCloseOkBody)) - throw new AMQException("Incoming frame on unopened channel is not a Connection.Open method."); + throw new AMQException("Incoming frame on closing connection is not a Connection.CloseOk method."); } else if (channel == null) { @@ -259,8 +260,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, if(!(frame.bodyFrame instanceof AMQRequestBody)) throw new AMQException("Incoming frame on unopened channel is not a request."); AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame; - if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody)) - throw new AMQException("Incoming frame on unopened channel is not a Channel.Open method."); + if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody)) { + closeSessionRequest( + requestBody.getMethodPayload().getConnectionException( + 504, "Incoming frame on unopened channel is not a Connection.Open method." + ) + ); + } if (requestBody.getRequestId() != 1) throw new AMQException("Incoming Channel.Open frame on unopened channel does not have a request id = 1."); channel = createChannel(frame.channel); @@ -283,13 +289,29 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws Exception { - if (_logger.isDebugEnabled()) + try{ + if (_logger.isDebugEnabled()) + { + _logger.debug("Request frame received: " + requestBody); + } + AMQChannel channel = getChannel(channelNum); + ResponseManager responseManager = channel.getResponseManager(); + responseManager.requestReceived(requestBody); + } + catch (AMQChannelException e) { - _logger.debug("Request frame received: " + requestBody); + _logger.error("Closing channel due to: " + e.getMessage()); + writeRequest(channelNum, e.getCloseMethodBody()); + AMQChannel channel = _channelMap.remove(channelNum); + if (channel != null) { + channel.close(this); + } + } + catch (AMQConnectionException e) + { + _logger.error("Closing connection due to: " + e.getMessage()); + closeSessionRequest(e); } - AMQChannel channel = getChannel(channelNum); - ResponseManager responseManager = channel.getResponseManager(); - responseManager.requestReceived(requestBody); } private void responseFrameReceived(int channelNum, AMQResponseBody responseBody) throws Exception @@ -490,6 +512,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { closeSessionRequest(replyCode, replyText, 0, 0); } + + + public void closeSessionRequest(AMQConnectionException e) throws AMQException + { + closeSessionRequest(e.getErrorCode(), e.getMessage(), e.getClassId(), e.getMethodId()); + } + // Used to close a connection as a response to a client close request public void closeSessionResponse(long requestId) throws AMQException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0b5c1fbaca..4da086b3bd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -52,6 +52,7 @@ import javax.naming.StringRefAddr; import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQUnresolvedAddressException; @@ -288,7 +289,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect message = "Unable to Connect"; } - AMQException e = new AMQConnectionException(message); + AMQException e = new AMQConnectionFailureException(message); if (lastException != null) { diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index 6254d80f32..93754dbee9 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -21,10 +21,46 @@ package org.apache.qpid; +import org.apache.qpid.framing.ConnectionCloseBody; + public class AMQConnectionException extends AMQException { - public AMQConnectionException(String message) + private final int _classId; + private final int _methodId; + /* AMQP version for which exception ocurred */ + private final byte major; + private final byte minor; + + public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) + { + super(errorCode, msg, t); + _classId = classId; + _methodId = methodId; + this.major = major; + this.minor = minor; + } + + public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor) + { + super(errorCode, msg); + _classId = classId; + _methodId = methodId; + this.major = major; + this.minor = minor; + } + + public ConnectionCloseBody getCloseMethodBody() + { + return ConnectionCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage()); + } + + public int getClassId() { - super(message); + return _classId; } + + public int getMethodId(){ + return _methodId; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java new file mode 100644 index 0000000000..0979598cdb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -0,0 +1,9 @@ +package org.apache.qpid; + +public class AMQConnectionFailureException extends AMQException +{ + public AMQConnectionFailureException(String message) + { + super(message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 40cd041ebf..e140a9b334 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -22,6 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; public abstract class AMQMethodBody extends AMQBody { @@ -103,4 +104,16 @@ public abstract class AMQMethodBody extends AMQBody { return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause); } + + public AMQConnectionException getConnectionException(int code, String message) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); + } + + + + public AMQConnectionException getConnectionException(int code, String message, Throwable cause) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); + } } |