summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java49
1 files changed, 44 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 30c7403a90..d34290e007 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -308,7 +308,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected final FlowControllingBlockingQueue _queue;
/** Holds the highest received delivery tag. */
- private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
@@ -630,6 +630,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
acknowledgeImpl();
+ markClean();
}
catch (TransportException e)
{
@@ -855,6 +856,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
//Check that we are clean to commit.
if (_failedOverDirty)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
+ }
rollback();
throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
@@ -1813,9 +1818,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
suspendChannel(true);
}
- // Let the dispatcher know that all the incomming messages
- // should be rolled back(reject/release)
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
syncDispatchQueue();
@@ -2772,6 +2775,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ /**
+ * Undeclares the specified temporary queue/topic.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param amqQueue The name of the temporary destination to delete.
+ *
+ * @throws JMSException If the queue could not be deleted for any reason.
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException
+ {
+ deleteQueue(amqQueue.getAMQQueueName());
+ }
+
public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
private long getNextProducerId()
@@ -3186,7 +3204,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
setConnectionStopped(true);
}
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
@@ -3335,6 +3353,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message because delivery tag " + deliveryTag
+ + " <= rollback mark " + _rollbackMark.get());
+ }
rejectMessage(message, true);
}
else if (_usingDispatcherForCleanup)
@@ -3396,6 +3419,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Don't reject if we're already closing
if (!_closed.get())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
+ + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+ }
rejectMessage(message, true);
}
}
@@ -3526,4 +3554,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
}
+
+ private void setRollbackMark()
+ {
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+ }
+ }
}