diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 49 |
1 files changed, 44 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 30c7403a90..d34290e007 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -308,7 +308,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected final FlowControllingBlockingQueue _queue; /** Holds the highest received delivery tag. */ - private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); /** All the not yet acknowledged message tags */ @@ -630,6 +630,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { acknowledgeImpl(); + markClean(); } catch (TransportException e) { @@ -855,6 +856,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic //Check that we are clean to commit. if (_failedOverDirty) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back."); + } rollback(); throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." + @@ -1813,9 +1818,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic suspendChannel(true); } - // Let the dispatcher know that all the incomming messages - // should be rolled back(reject/release) - _rollbackMark.set(_highestDeliveryTag.get()); + setRollbackMark(); syncDispatchQueue(); @@ -2772,6 +2775,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + /** + * Undeclares the specified temporary queue/topic. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param amqQueue The name of the temporary destination to delete. + * + * @throws JMSException If the queue could not be deleted for any reason. + * @todo Be aware of possible changes to parameter order as versions change. + */ + protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException + { + deleteQueue(amqQueue.getAMQQueueName()); + } + public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; private long getNextProducerId() @@ -3186,7 +3204,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic setConnectionStopped(true); } - _rollbackMark.set(_highestDeliveryTag.get()); + setRollbackMark(); _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); @@ -3335,6 +3353,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, _rollbackMark.get())) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting message because delivery tag " + deliveryTag + + " <= rollback mark " + _rollbackMark.get()); + } rejectMessage(message, true); } else if (_usingDispatcherForCleanup) @@ -3396,6 +3419,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Don't reject if we're already closing if (!_closed.get()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag() + + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag)); + } rejectMessage(message, true); } } @@ -3526,4 +3554,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()); } + + private void setRollbackMark() + { + // Let the dispatcher know that all the incomming messages + // should be rolled back(reject/release) + _rollbackMark.set(_highestDeliveryTag.get()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Rollback mark is set to " + _rollbackMark.get()); + } + } } |