diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 121 |
1 files changed, 100 insertions, 21 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a0b79b135d..804c846572 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.client; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -100,6 +100,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -293,6 +294,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final boolean _strictAMQPFATAL; private final Object _messageDeliveryLock = new Object(); + /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ + private boolean _dirty; + /** Has failover occured on this session */ + private boolean _failedOver; + /** * Creates a new session on a connection. * @@ -610,30 +616,65 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkTransacted(); - try + new FailoverNoopSupport<Object, JMSException>(new FailoverProtectedOperation<Object, JMSException>() { - // Acknowledge up to message last delivered (if any) for each consumer. - // need to send ack for messages delivered to consumers so far - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + public Object execute() throws JMSException, FailoverException { - // Sends acknowledgement to server - i.next().acknowledgeLastDelivered(); - } + //Check that we are clean to commit. + if (_failedOver && _dirty) + { + rollback(); - // Commits outstanding messages sent and outstanding acknowledgements. - final AMQProtocolHandler handler = getProtocolHandler(); + throw new TransactionRolledBackException("Connection failover has occured since last send. " + + "Forced rollback"); + } - handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), - TxCommitOkBody.class); - } - catch (AMQException e) - { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); - } - catch (FailoverException e) - { - throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); - } + try + { + // Acknowledge up to message last delivered (if any) on this session. + // We only need to find the highest value and ack that as commit is session level. + Long lastTag = -1L; + + for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + { +// i.next().acknowledgeLastDelivered(); +// } + + // get next acknowledgement to server + Long next = i.next().getLastDelivered(); + if (next != null && next > lastTag) + { + lastTag = next; + } + } + + if (lastTag != -1) + { + acknowledgeMessage(lastTag, true); + } + + // Commits outstanding messages sent and outstanding acknowledgements. + final AMQProtocolHandler handler = getProtocolHandler(); + + handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), + TxCommitOkBody.class); + + markClean(); + } + + catch (AMQException e) + { + throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + } + + catch (FailoverException e) + { + throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); + } + + return null; + } + }, _connection).execute(); } public void confirmConsumerCancelled(AMQShortString consumerTag) @@ -1431,6 +1472,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + markClean(); + if (!isSuspended) { suspendChannel(false); @@ -1731,6 +1774,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ void resubscribe() throws AMQException { + _failedOver = true; resubscribeProducers(); resubscribeConsumers(); } @@ -2532,6 +2576,41 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _messageDeliveryLock; } + /** + * Signifies that the session has pending sends to commit. + */ + public void markDirty() + { + _dirty = true; + } + + /** + * Signifies that the session has no pending sends to commit. + */ + public void markClean() + { + _dirty = false; + _failedOver = false; + } + + /** + * Check to see if failover has occured since the last call to markClean(commit or rollback). + * @return boolean true if failover has occured. + */ + public boolean hasFailedOver() + { + return _failedOver; + } + + /** + * Check to see if any message have been sent in this transaction and have not been commited. + * @return boolean true if a message has been sent but not commited + */ + public boolean isDirty() + { + return _dirty; + } + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { |