summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-03 22:46:16 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-03 22:46:16 +0000
commit57cd30140c3b22186a1dc8358597923cdbf25c73 (patch)
tree14a522edfa0f2ced5d250f5a45d5c68a05f4d530
parentf7dfeca78806072ef81ae561f31915a3a11b0e73 (diff)
downloadqpid-python-57cd30140c3b22186a1dc8358597923cdbf25c73.tar.gz
QPID-3233
If the underlying AMQP session gets closed, the JMS session is now notified along with details that caused the session closure. The JMS Session will throw an exception (with an error code and details) when a user accesses the closed session. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1099288 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java32
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java3
5 files changed, 68 insertions, 9 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ff7e1cfaba..8ac73d730f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -567,6 +567,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
close(-1);
}
+ public abstract AMQException getLastException();
+
public void checkNotClosed() throws JMSException
{
try
@@ -575,16 +577,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (IllegalStateException ise)
{
- // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for
- AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
+ AMQException ex = getLastException();
+ if (ex != null)
+ {
+ IllegalStateException ssnClosed = new IllegalStateException(
+ "Session has been closed", ex.getErrorCode().toString());
- if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
+ ssnClosed.setLinkedException(ex);
+ ssnClosed.initCause(ex);
+ throw ssnClosed;
+ }
+ else
{
- ise.setLinkedException(manager.getLastException());
- ise.initCause(ise.getLinkedException());
+ throw ise;
}
-
- throw ise;
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index be8d39e4b5..851c22f847 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -913,7 +913,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
setCurrentException(exc);
}
- public void closed(Session ssn) {}
+ public void closed(Session ssn)
+ {
+ try
+ {
+ super.closed(null);
+ } catch (Exception e)
+ {
+ _logger.error("Error closing JMS session", e);
+ }
+ }
+
+ public AMQException getLastException()
+ {
+ return getCurrentException();
+ }
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
final boolean noLocal, final boolean nowait)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index f41b1c94fa..c010e4c7ed 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -38,6 +38,7 @@ import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQFrame;
@@ -584,4 +585,35 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
queueName == null ? null : new AMQShortString(queueName),
bindingKey == null ? null : new AMQShortString(bindingKey));
}
+
+
+ public AMQException getLastException()
+ {
+ // if the Connection has closed then we should throw any exception that
+ // has occurred that we were not waiting for
+ AMQStateManager manager = _connection.getProtocolHandler()
+ .getStateManager();
+
+ Exception e = manager.getLastException();
+ if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
+ && e != null)
+ {
+ if (e instanceof AMQException)
+ {
+ return (AMQException) e;
+ }
+ else
+ {
+ AMQException amqe = new AMQException(AMQConstant
+ .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
+ e.getMessage(), e.getCause());
+ return amqe;
+ }
+ }
+ else
+ {
+ return null;
+ }
+ }
+
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index 47c0359b94..4637c6e505 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -195,4 +195,10 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
return false;
}
+
+ @Override
+ public AMQException getLastException()
+ {
+ return null;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index b3c335ae68..862c37283b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -1031,7 +1031,8 @@ public class Session extends SessionInvoker
if(state == CLOSED)
{
- connection.removeSession(this);
+ connection.removeSession(this);
+ listener.closed(this);
}
}