summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-01-25 18:24:48 +0000
committerGordon Sim <gsim@apache.org>2007-01-25 18:24:48 +0000
commitf90210810795926ce7a6111b1d4dac9b81df78fa (patch)
tree181fad6ad1c3aa59334b09e94af962432e1e02bf
parenta1bf3981115c397e55cd7fdc195381a10d5f940c (diff)
downloadqpid-python-f90210810795926ce7a6111b1d4dac9b81df78fa.tar.gz
Improved channel/connection exception handling.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@499880 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionException.java40
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java13
5 files changed, 99 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 1cfc90d8e0..a5dfc6d1e5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -25,6 +25,7 @@ import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQProtocolVersionException;
@@ -246,7 +247,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
throw new AMQException("Incoming request frame on connection which is pending close.");
AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
if (!(requestBody.getMethodPayload() instanceof ConnectionCloseOkBody))
- throw new AMQException("Incoming frame on unopened channel is not a Connection.Open method.");
+ throw new AMQException("Incoming frame on closing connection is not a Connection.CloseOk method.");
}
else if (channel == null)
{
@@ -259,8 +260,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
if(!(frame.bodyFrame instanceof AMQRequestBody))
throw new AMQException("Incoming frame on unopened channel is not a request.");
AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
- if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody))
- throw new AMQException("Incoming frame on unopened channel is not a Channel.Open method.");
+ if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody)) {
+ closeSessionRequest(
+ requestBody.getMethodPayload().getConnectionException(
+ 504, "Incoming frame on unopened channel is not a Connection.Open method."
+ )
+ );
+ }
if (requestBody.getRequestId() != 1)
throw new AMQException("Incoming Channel.Open frame on unopened channel does not have a request id = 1.");
channel = createChannel(frame.channel);
@@ -283,13 +289,29 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws Exception
{
- if (_logger.isDebugEnabled())
+ try{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request frame received: " + requestBody);
+ }
+ AMQChannel channel = getChannel(channelNum);
+ ResponseManager responseManager = channel.getResponseManager();
+ responseManager.requestReceived(requestBody);
+ }
+ catch (AMQChannelException e)
{
- _logger.debug("Request frame received: " + requestBody);
+ _logger.error("Closing channel due to: " + e.getMessage());
+ writeRequest(channelNum, e.getCloseMethodBody());
+ AMQChannel channel = _channelMap.remove(channelNum);
+ if (channel != null) {
+ channel.close(this);
+ }
+ }
+ catch (AMQConnectionException e)
+ {
+ _logger.error("Closing connection due to: " + e.getMessage());
+ closeSessionRequest(e);
}
- AMQChannel channel = getChannel(channelNum);
- ResponseManager responseManager = channel.getResponseManager();
- responseManager.requestReceived(requestBody);
}
private void responseFrameReceived(int channelNum, AMQResponseBody responseBody) throws Exception
@@ -490,6 +512,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
closeSessionRequest(replyCode, replyText, 0, 0);
}
+
+
+ public void closeSessionRequest(AMQConnectionException e) throws AMQException
+ {
+ closeSessionRequest(e.getErrorCode(), e.getMessage(), e.getClassId(), e.getMethodId());
+ }
+
// Used to close a connection as a response to a client close request
public void closeSessionResponse(long requestId) throws AMQException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0b5c1fbaca..4da086b3bd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -52,6 +52,7 @@ import javax.naming.StringRefAddr;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQUnresolvedAddressException;
@@ -288,7 +289,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
message = "Unable to Connect";
}
- AMQException e = new AMQConnectionException(message);
+ AMQException e = new AMQConnectionFailureException(message);
if (lastException != null)
{
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
index 6254d80f32..93754dbee9 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
@@ -21,10 +21,46 @@
package org.apache.qpid;
+import org.apache.qpid.framing.ConnectionCloseBody;
+
public class AMQConnectionException extends AMQException
{
- public AMQConnectionException(String message)
+ private final int _classId;
+ private final int _methodId;
+ /* AMQP version for which exception ocurred */
+ private final byte major;
+ private final byte minor;
+
+ public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
+ {
+ super(errorCode, msg, t);
+ _classId = classId;
+ _methodId = methodId;
+ this.major = major;
+ this.minor = minor;
+ }
+
+ public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor)
+ {
+ super(errorCode, msg);
+ _classId = classId;
+ _methodId = methodId;
+ this.major = major;
+ this.minor = minor;
+ }
+
+ public ConnectionCloseBody getCloseMethodBody()
+ {
+ return ConnectionCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage());
+ }
+
+ public int getClassId()
{
- super(message);
+ return _classId;
}
+
+ public int getMethodId(){
+ return _methodId;
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
new file mode 100644
index 0000000000..0979598cdb
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
@@ -0,0 +1,9 @@
+package org.apache.qpid;
+
+public class AMQConnectionFailureException extends AMQException
+{
+ public AMQConnectionFailureException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index 40cd041ebf..e140a9b334 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -22,6 +22,7 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
public abstract class AMQMethodBody extends AMQBody
{
@@ -103,4 +104,16 @@ public abstract class AMQMethodBody extends AMQBody
{
return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause);
}
+
+ public AMQConnectionException getConnectionException(int code, String message)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
+ }
+
+
+
+ public AMQConnectionException getConnectionException(int code, String message, Throwable cause)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause);
+ }
}