summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-29 13:53:53 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-29 13:53:53 +0000
commit6aecbf875c0baee95e1b9e65524f88a0eaaff31a (patch)
treefd53013fe32892cb67c3f0f78ee2a3b7fc8577eb
parent79f63488344f522504df67c2084a06336d9cf601 (diff)
downloadqpid-python-6aecbf875c0baee95e1b9e65524f88a0eaaff31a.tar.gz
QPID-5522 : TransactionController endless wait when the TCP/IP connection is lost
detect link / session / connection failures when sending to, and expecting a response from, the transaction controller git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1562444 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java66
-rw-r--r--java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java4
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java2
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java4
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java75
5 files changed, 101 insertions, 50 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index a1cf0ef4e7..2318b8ba9b 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -38,15 +38,8 @@ import javax.jms.Queue;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import org.apache.qpid.amqp_1_0.client.Connection;
-import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.ConnectionException;
-import org.apache.qpid.amqp_1_0.client.ChannelsExhaustedException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.client.Sender;
-import org.apache.qpid.amqp_1_0.client.Transaction;
+
+import org.apache.qpid.amqp_1_0.client.*;
import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.QueueSession;
@@ -150,7 +143,17 @@ public class SessionImpl implements Session, QueueSession, TopicSession
});
if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
{
- _txn = _session.createSessionLocalTransaction();
+ try
+ {
+ _txn = _session.createSessionLocalTransaction();
+ }
+ catch (LinkDetachedException e)
+ {
+ JMSException jmsException = new JMSException("Unable to create transactional session");
+ jmsException.setLinkedException(e);
+ jmsException.initCause(e);
+ throw jmsException;
+ }
}
_messageFactory = new MessageFactory(this);
@@ -236,14 +239,23 @@ public class SessionImpl implements Session, QueueSession, TopicSession
checkClosed();
checkTransactional();
- _txn.commit();
- for(MessageConsumerImpl consumer : _consumers)
+ try
{
- consumer.postCommit();
- }
+ _txn.commit();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.postCommit();
+ }
- _txn = _session.createSessionLocalTransaction();
- //TODO
+ _txn = _session.createSessionLocalTransaction();
+ }
+ catch (LinkDetachedException e)
+ {
+ final JMSException jmsException = new JMSException("Unable to commit transaction");
+ jmsException.setLinkedException(e);
+ jmsException.initCause(e);
+ throw jmsException;
+ }
}
public void rollback() throws JMSException
@@ -251,16 +263,24 @@ public class SessionImpl implements Session, QueueSession, TopicSession
checkClosed();
checkTransactional();
- _txn.rollback();
-
- for(MessageConsumerImpl consumer : _consumers)
+ try
{
- consumer.postRollback();
- }
+ _txn.rollback();
- _txn = _session.createSessionLocalTransaction();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.postRollback();
+ }
- //TODO
+ _txn = _session.createSessionLocalTransaction();
+ }
+ catch (LinkDetachedException e)
+ {
+ final JMSException jmsException = new JMSException("Unable to rollback transaction");
+ jmsException.setLinkedException(e);
+ jmsException.initCause(e);
+ throw jmsException;
+ }
}
private void checkTransactional() throws JMSException
diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
index 6f99bbc2ee..a084c0bacc 100644
--- a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
+++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
@@ -230,6 +230,10 @@ public class Receive extends Util
{
e.printStackTrace(); //TODO.
}
+ catch (LinkDetachedException e)
+ {
+ e.printStackTrace();
+ }
}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
index ce1ce512a2..cac4775b54 100644
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
@@ -345,7 +345,7 @@ public class Session
}
- public Transaction createSessionLocalTransaction()
+ public Transaction createSessionLocalTransaction() throws LinkDetachedException
{
TransactionController localController = getSessionLocalTransactionController();
return localController.beginTransaction();
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
index a379463710..e67f9e2fce 100644
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
@@ -32,12 +32,12 @@ public class Transaction
_txnId = txnId;
}
- public void commit()
+ public void commit() throws LinkDetachedException
{
_transactionController.commit(this);
}
- public void rollback()
+ public void rollback() throws LinkDetachedException
{
_transactionController.rollback(this);
}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
index 9f2c76bc72..7bf143cf4b 100644
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
@@ -21,14 +21,17 @@ package org.apache.qpid.amqp_1_0.client;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.DeliveryState;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.transaction.Declare;
import org.apache.qpid.amqp_1_0.type.transaction.Declared;
import org.apache.qpid.amqp_1_0.type.transaction.Discharge;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class TransactionController implements DeliveryStateHandler
@@ -38,15 +41,30 @@ public class TransactionController implements DeliveryStateHandler
private Session _session;
private volatile DeliveryState _state;
private boolean _received;
+ private Error _error;
public TransactionController(Session session, SendingLinkEndpoint tcLinkEndpoint)
{
_session = session;
_endpoint = tcLinkEndpoint;
_endpoint.setDeliveryStateHandler(this);
+ _endpoint.setLinkEventListener(new SendingLinkListener()
+ {
+ @Override
+ public void flowStateChanged()
+ {
+ // ignore
+ }
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ TransactionController.this.remoteDetached(detach);
+ }
+ });
}
- public Transaction beginTransaction()
+ public Transaction beginTransaction() throws LinkDetachedException
{
@@ -54,7 +72,7 @@ public class TransactionController implements DeliveryStateHandler
return new Transaction(this, txnId);
}
- private Binary declare()
+ private Binary declare() throws LinkDetachedException
{
SectionEncoder encoder = _session.getSectionEncoder();
@@ -87,9 +105,17 @@ public class TransactionController implements DeliveryStateHandler
//TODO - rationalise sending of flows
// _endpoint.sendFlow();
}
+ waitForResponse();
+
+
+ return ((Declared) _state).getTxnId();
+ }
+
+ private void waitForResponse() throws LinkDetachedException
+ {
synchronized (this)
{
- while(!_received)
+ while(!_received && !_endpoint.isDetached())
{
try
{
@@ -101,23 +127,33 @@ public class TransactionController implements DeliveryStateHandler
}
}
}
+ if(!_received && _endpoint.isDetached())
+ {
+ throw new LinkDetachedException(_error);
+ }
+ }
-
- return ((Declared) _state).getTxnId();
+ private synchronized void remoteDetached(Detach detach)
+ {
+ if(detach != null && detach.getError() != null)
+ {
+ _error = detach.getError();
+ notifyAll();
+ }
}
- public void commit(final Transaction transaction)
+ public void commit(final Transaction transaction) throws LinkDetachedException
{
discharge(transaction.getTxnId(), false);
}
- public void rollback(final Transaction transaction)
+ public void rollback(final Transaction transaction) throws LinkDetachedException
{
discharge(transaction.getTxnId(), true);
}
- private void discharge(final Binary txnId, final boolean fail)
+ private void discharge(final Binary txnId, final boolean fail) throws LinkDetachedException
{
Discharge discharge = new Discharge();
discharge.setTxnId(txnId);
@@ -135,7 +171,7 @@ public class TransactionController implements DeliveryStateHandler
final Object lock = _endpoint.getLock();
synchronized(lock)
{
- while(!_endpoint.hasCreditToSend())
+ while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
{
try
{
@@ -146,6 +182,10 @@ public class TransactionController implements DeliveryStateHandler
}
}
+ if(_endpoint.isDetached())
+ {
+ throw new LinkDetachedException(_error);
+ }
_state = null;
_received = false;
_endpoint.transfer(transfer);
@@ -153,20 +193,7 @@ public class TransactionController implements DeliveryStateHandler
//TODO - rationalise sending of flows
// _endpoint.sendFlow();
}
- synchronized (this)
- {
- while(!_received)
- {
- try
- {
- wait();
- }
- catch (InterruptedException e)
- {
-
- }
- }
- }
+ waitForResponse();
}