diff options
Diffstat (limited to 'java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java')
-rw-r--r-- | java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 66 |
1 files changed, 43 insertions, 23 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index a1cf0ef4e7..2318b8ba9b 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -38,15 +38,8 @@ import javax.jms.Queue; import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; -import org.apache.qpid.amqp_1_0.client.Connection; -import org.apache.qpid.amqp_1_0.client.ConnectionClosedException; -import org.apache.qpid.amqp_1_0.client.ConnectionErrorException; -import org.apache.qpid.amqp_1_0.client.ConnectionException; -import org.apache.qpid.amqp_1_0.client.ChannelsExhaustedException; -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.client.Receiver; -import org.apache.qpid.amqp_1_0.client.Sender; -import org.apache.qpid.amqp_1_0.client.Transaction; + +import org.apache.qpid.amqp_1_0.client.*; import org.apache.qpid.amqp_1_0.jms.QueueReceiver; import org.apache.qpid.amqp_1_0.jms.QueueSender; import org.apache.qpid.amqp_1_0.jms.QueueSession; @@ -150,7 +143,17 @@ public class SessionImpl implements Session, QueueSession, TopicSession }); if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED) { - _txn = _session.createSessionLocalTransaction(); + try + { + _txn = _session.createSessionLocalTransaction(); + } + catch (LinkDetachedException e) + { + JMSException jmsException = new JMSException("Unable to create transactional session"); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; + } } _messageFactory = new MessageFactory(this); @@ -236,14 +239,23 @@ public class SessionImpl implements Session, QueueSession, TopicSession checkClosed(); checkTransactional(); - _txn.commit(); - for(MessageConsumerImpl consumer : _consumers) + try { - consumer.postCommit(); - } + _txn.commit(); + for(MessageConsumerImpl consumer : _consumers) + { + consumer.postCommit(); + } - _txn = _session.createSessionLocalTransaction(); - //TODO + _txn = _session.createSessionLocalTransaction(); + } + catch (LinkDetachedException e) + { + final JMSException jmsException = new JMSException("Unable to commit transaction"); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; + } } public void rollback() throws JMSException @@ -251,16 +263,24 @@ public class SessionImpl implements Session, QueueSession, TopicSession checkClosed(); checkTransactional(); - _txn.rollback(); - - for(MessageConsumerImpl consumer : _consumers) + try { - consumer.postRollback(); - } + _txn.rollback(); - _txn = _session.createSessionLocalTransaction(); + for(MessageConsumerImpl consumer : _consumers) + { + consumer.postRollback(); + } - //TODO + _txn = _session.createSessionLocalTransaction(); + } + catch (LinkDetachedException e) + { + final JMSException jmsException = new JMSException("Unable to rollback transaction"); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; + } } private void checkTransactional() throws JMSException |