summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java121
1 files changed, 100 insertions, 21 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a0b79b135d..804c846572 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -100,6 +100,7 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -293,6 +294,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
+ /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
+ private boolean _dirty;
+ /** Has failover occured on this session */
+ private boolean _failedOver;
+
/**
* Creates a new session on a connection.
*
@@ -610,30 +616,65 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkTransacted();
- try
+ new FailoverNoopSupport<Object, JMSException>(new FailoverProtectedOperation<Object, JMSException>()
{
- // Acknowledge up to message last delivered (if any) for each consumer.
- // need to send ack for messages delivered to consumers so far
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ public Object execute() throws JMSException, FailoverException
{
- // Sends acknowledgement to server
- i.next().acknowledgeLastDelivered();
- }
+ //Check that we are clean to commit.
+ if (_failedOver && _dirty)
+ {
+ rollback();
- // Commits outstanding messages sent and outstanding acknowledgements.
- final AMQProtocolHandler handler = getProtocolHandler();
+ throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+ "Forced rollback");
+ }
- handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
- TxCommitOkBody.class);
- }
- catch (AMQException e)
- {
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
- }
- catch (FailoverException e)
- {
- throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
- }
+ try
+ {
+ // Acknowledge up to message last delivered (if any) on this session.
+ // We only need to find the highest value and ack that as commit is session level.
+ Long lastTag = -1L;
+
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ {
+// i.next().acknowledgeLastDelivered();
+// }
+
+ // get next acknowledgement to server
+ Long next = i.next().getLastDelivered();
+ if (next != null && next > lastTag)
+ {
+ lastTag = next;
+ }
+ }
+
+ if (lastTag != -1)
+ {
+ acknowledgeMessage(lastTag, true);
+ }
+
+ // Commits outstanding messages sent and outstanding acknowledgements.
+ final AMQProtocolHandler handler = getProtocolHandler();
+
+ handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
+ TxCommitOkBody.class);
+
+ markClean();
+ }
+
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ }
+
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+ }
+
+ return null;
+ }
+ }, _connection).execute();
}
public void confirmConsumerCancelled(AMQShortString consumerTag)
@@ -1431,6 +1472,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+ markClean();
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1731,6 +1774,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
void resubscribe() throws AMQException
{
+ _failedOver = true;
resubscribeProducers();
resubscribeConsumers();
}
@@ -2532,6 +2576,41 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _messageDeliveryLock;
}
+ /**
+ * Signifies that the session has pending sends to commit.
+ */
+ public void markDirty()
+ {
+ _dirty = true;
+ }
+
+ /**
+ * Signifies that the session has no pending sends to commit.
+ */
+ public void markClean()
+ {
+ _dirty = false;
+ _failedOver = false;
+ }
+
+ /**
+ * Check to see if failover has occured since the last call to markClean(commit or rollback).
+ * @return boolean true if failover has occured.
+ */
+ public boolean hasFailedOver()
+ {
+ return _failedOver;
+ }
+
+ /**
+ * Check to see if any message have been sent in this transaction and have not been commited.
+ * @return boolean true if a message has been sent but not commited
+ */
+ public boolean isDirty()
+ {
+ return _dirty;
+ }
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{