diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 13 |
1 files changed, 12 insertions, 1 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index fa15df34ec..fc81e32e4d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -60,6 +60,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -777,8 +778,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { + //Check that we are clean to commit. + if (_failedOverDirty) + { + rollback(); + + throw new TransactionRolledBackException("Connection failover has occured since last send. " + + "Forced rollback"); + } + - // TGM FIXME: what about failover? // Acknowledge all delivered messages while (true) { @@ -1509,6 +1518,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic sendRecover(); + markClean(); + if (!isSuspended) { suspendChannel(false); |