From 6aecbf875c0baee95e1b9e65524f88a0eaaff31a Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 29 Jan 2014 13:53:53 +0000 Subject: QPID-5522 : TransactionController endless wait when the TCP/IP connection is lost detect link / session / connection failures when sending to, and expecting a response from, the transaction controller git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1562444 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 66 ++++++++++++------- .../org/apache/qpid/amqp_1_0/client/Receive.java | 4 ++ .../org/apache/qpid/amqp_1_0/client/Session.java | 2 +- .../apache/qpid/amqp_1_0/client/Transaction.java | 4 +- .../amqp_1_0/client/TransactionController.java | 75 +++++++++++++++------- 5 files changed, 101 insertions(+), 50 deletions(-) diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index a1cf0ef4e7..2318b8ba9b 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -38,15 +38,8 @@ import javax.jms.Queue; import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; -import org.apache.qpid.amqp_1_0.client.Connection; -import org.apache.qpid.amqp_1_0.client.ConnectionClosedException; -import org.apache.qpid.amqp_1_0.client.ConnectionErrorException; -import org.apache.qpid.amqp_1_0.client.ConnectionException; -import org.apache.qpid.amqp_1_0.client.ChannelsExhaustedException; -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.client.Receiver; -import org.apache.qpid.amqp_1_0.client.Sender; -import org.apache.qpid.amqp_1_0.client.Transaction; + +import org.apache.qpid.amqp_1_0.client.*; import org.apache.qpid.amqp_1_0.jms.QueueReceiver; import org.apache.qpid.amqp_1_0.jms.QueueSender; import org.apache.qpid.amqp_1_0.jms.QueueSession; @@ -150,7 +143,17 @@ public class SessionImpl implements Session, QueueSession, TopicSession }); if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED) { - _txn = _session.createSessionLocalTransaction(); + try + { + _txn = _session.createSessionLocalTransaction(); + } + catch (LinkDetachedException e) + { + JMSException jmsException = new JMSException("Unable to create transactional session"); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; + } } _messageFactory = new MessageFactory(this); @@ -236,14 +239,23 @@ public class SessionImpl implements Session, QueueSession, TopicSession checkClosed(); checkTransactional(); - _txn.commit(); - for(MessageConsumerImpl consumer : _consumers) + try { - consumer.postCommit(); - } + _txn.commit(); + for(MessageConsumerImpl consumer : _consumers) + { + consumer.postCommit(); + } - _txn = _session.createSessionLocalTransaction(); - //TODO + _txn = _session.createSessionLocalTransaction(); + } + catch (LinkDetachedException e) + { + final JMSException jmsException = new JMSException("Unable to commit transaction"); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; + } } public void rollback() throws JMSException @@ -251,16 +263,24 @@ public class SessionImpl implements Session, QueueSession, TopicSession checkClosed(); checkTransactional(); - _txn.rollback(); - - for(MessageConsumerImpl consumer : _consumers) + try { - consumer.postRollback(); - } + _txn.rollback(); - _txn = _session.createSessionLocalTransaction(); + for(MessageConsumerImpl consumer : _consumers) + { + consumer.postRollback(); + } - //TODO + _txn = _session.createSessionLocalTransaction(); + } + catch (LinkDetachedException e) + { + final JMSException jmsException = new JMSException("Unable to rollback transaction"); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; + } } private void checkTransactional() throws JMSException diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java index 6f99bbc2ee..a084c0bacc 100644 --- a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java @@ -230,6 +230,10 @@ public class Receive extends Util { e.printStackTrace(); //TODO. } + catch (LinkDetachedException e) + { + e.printStackTrace(); + } } diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java index ce1ce512a2..cac4775b54 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -345,7 +345,7 @@ public class Session } - public Transaction createSessionLocalTransaction() + public Transaction createSessionLocalTransaction() throws LinkDetachedException { TransactionController localController = getSessionLocalTransactionController(); return localController.beginTransaction(); diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java index a379463710..e67f9e2fce 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java @@ -32,12 +32,12 @@ public class Transaction _txnId = txnId; } - public void commit() + public void commit() throws LinkDetachedException { _transactionController.commit(this); } - public void rollback() + public void rollback() throws LinkDetachedException { _transactionController.rollback(this); } diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java index 9f2c76bc72..7bf143cf4b 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java @@ -21,14 +21,17 @@ package org.apache.qpid.amqp_1_0.client; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.DeliveryState; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.transaction.Declare; import org.apache.qpid.amqp_1_0.type.transaction.Declared; import org.apache.qpid.amqp_1_0.type.transaction.Discharge; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.amqp_1_0.type.transport.Error; public class TransactionController implements DeliveryStateHandler @@ -38,15 +41,30 @@ public class TransactionController implements DeliveryStateHandler private Session _session; private volatile DeliveryState _state; private boolean _received; + private Error _error; public TransactionController(Session session, SendingLinkEndpoint tcLinkEndpoint) { _session = session; _endpoint = tcLinkEndpoint; _endpoint.setDeliveryStateHandler(this); + _endpoint.setLinkEventListener(new SendingLinkListener() + { + @Override + public void flowStateChanged() + { + // ignore + } + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + TransactionController.this.remoteDetached(detach); + } + }); } - public Transaction beginTransaction() + public Transaction beginTransaction() throws LinkDetachedException { @@ -54,7 +72,7 @@ public class TransactionController implements DeliveryStateHandler return new Transaction(this, txnId); } - private Binary declare() + private Binary declare() throws LinkDetachedException { SectionEncoder encoder = _session.getSectionEncoder(); @@ -87,9 +105,17 @@ public class TransactionController implements DeliveryStateHandler //TODO - rationalise sending of flows // _endpoint.sendFlow(); } + waitForResponse(); + + + return ((Declared) _state).getTxnId(); + } + + private void waitForResponse() throws LinkDetachedException + { synchronized (this) { - while(!_received) + while(!_received && !_endpoint.isDetached()) { try { @@ -101,23 +127,33 @@ public class TransactionController implements DeliveryStateHandler } } } + if(!_received && _endpoint.isDetached()) + { + throw new LinkDetachedException(_error); + } + } - - return ((Declared) _state).getTxnId(); + private synchronized void remoteDetached(Detach detach) + { + if(detach != null && detach.getError() != null) + { + _error = detach.getError(); + notifyAll(); + } } - public void commit(final Transaction transaction) + public void commit(final Transaction transaction) throws LinkDetachedException { discharge(transaction.getTxnId(), false); } - public void rollback(final Transaction transaction) + public void rollback(final Transaction transaction) throws LinkDetachedException { discharge(transaction.getTxnId(), true); } - private void discharge(final Binary txnId, final boolean fail) + private void discharge(final Binary txnId, final boolean fail) throws LinkDetachedException { Discharge discharge = new Discharge(); discharge.setTxnId(txnId); @@ -135,7 +171,7 @@ public class TransactionController implements DeliveryStateHandler final Object lock = _endpoint.getLock(); synchronized(lock) { - while(!_endpoint.hasCreditToSend()) + while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached()) { try { @@ -146,6 +182,10 @@ public class TransactionController implements DeliveryStateHandler } } + if(_endpoint.isDetached()) + { + throw new LinkDetachedException(_error); + } _state = null; _received = false; _endpoint.transfer(transfer); @@ -153,20 +193,7 @@ public class TransactionController implements DeliveryStateHandler //TODO - rationalise sending of flows // _endpoint.sendFlow(); } - synchronized (this) - { - while(!_received) - { - try - { - wait(); - } - catch (InterruptedException e) - { - - } - } - } + waitForResponse(); } -- cgit v1.2.1