diff options
author | Keith Wall <kwall@apache.org> | 2015-03-26 23:44:39 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-03-26 23:44:39 +0000 |
commit | d9f4fdcab179600c43d86fd07949d1c5ecbb1767 (patch) | |
tree | d7263757a19b694e121bbd2eadb912e2ad8a9559 | |
parent | 9cc2707d5306101dcf3fb371624fbc29323c00f6 (diff) | |
download | qpid-python-d9f4fdcab179600c43d86fd07949d1c5ecbb1767.tar.gz |
QPID-6466: [Java Client] Avoid possibilty that the dispatcher and IO thread can reject/release the same message during consumer close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1669472 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 68 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java | 59 |
2 files changed, 68 insertions, 59 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 6c43396481..0ca3505053 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -890,16 +890,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch(TransportException e) { - throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e); + throw toJMSException("Session exception occurred while trying to commit: " + e.getMessage(), e); } } protected abstract void commitImpl() throws AMQException, FailoverException, TransportException; + + + public void confirmConsumerCancelled(int consumerTag) { - - // Remove the consumer from the map C consumer = _consumers.get(consumerTag); if (consumer != null) { @@ -917,7 +918,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic startDispatcherIfNecessary(true); } - _dispatcher.rejectPending(consumer); + rejectPending(consumer); } else // Queue Browser { @@ -947,6 +948,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + private void rejectPending(C consumer) + { + // Reject messages on pre-receive queue + consumer.rollbackPendingMessages(); + + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag()); + + // closeConsumer + consumer.markClosed(); + } + public QueueBrowser createBrowser(Queue queue) throws JMSException { if (isStrictAMQP()) @@ -3077,19 +3090,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _producers.put(producerId, producer); } - /** - * @param consumerTag The consumerTag to prune from queue or all if null - * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) - * @param rejectAllConsumers - */ - - private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) + private void rejectMessagesForConsumerTag(int consumerTag) { Iterator<Dispatchable> messages = _queue.iterator(); if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" - + requeue); + _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ")"); if (messages.hasNext()) { @@ -3104,21 +3110,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if (rejectAllConsumers || (message.getConsumerTag() == consumerTag)) + if (message.getConsumerTag() == consumerTag) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" - + message.getDeliveryTag()); - } - messages.remove(); + if (_queue.remove(message)) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" + + message.getDeliveryTag()); + } - rejectMessage(message, requeue); + rejectMessage(message, true); - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag); + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag); + } } } } @@ -3288,18 +3296,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return _closed; } - public void rejectPending(C consumer) - { - // Reject messages on pre-receive queue - consumer.rollbackPendingMessages(); - - // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - - // closeConsumer - consumer.markClosed(); - - } public void rollback() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index df54b7066b..512f6a3b4a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -59,6 +59,7 @@ public class FlowControllingBlockingQueue<T> return _queue.isEmpty(); } + public interface ThresholdListener { void aboveThreshold(int currentValue); @@ -104,14 +105,7 @@ public class FlowControllingBlockingQueue<T> if (o != null && !disableFlowControl && _listener != null) { - synchronized (_listener) - { - if (_count-- == _flowControlLowThreshold) - { - _listener.underThreshold(_count); - } - } - + reportBelowIfNecessary(); } return o; @@ -132,14 +126,7 @@ public class FlowControllingBlockingQueue<T> } if (!disableFlowControl && _listener != null) { - synchronized (_listener) - { - if (_count-- == _flowControlLowThreshold) - { - _listener.underThreshold(_count); - } - } - + reportBelowIfNecessary(); } return o; @@ -155,18 +142,44 @@ public class FlowControllingBlockingQueue<T> } if (!disableFlowControl && _listener != null) { - synchronized (_listener) - { - if (++_count == _flowControlHighThreshold) - { - _listener.aboveThreshold(_count); - } - } + reportAboveIfNecessary(); } } + public boolean remove(final T o) + { + final boolean removed = _queue.remove(o); + if (removed && !disableFlowControl && _listener != null) + { + reportBelowIfNecessary(); + } + return removed; + } + public Iterator<T> iterator() { return _queue.iterator(); } + + private void reportAboveIfNecessary() + { + synchronized (_listener) + { + if (++_count == _flowControlHighThreshold) + { + _listener.aboveThreshold(_count); + } + } + } + + private void reportBelowIfNecessary() + { + synchronized (_listener) + { + if (_count-- == _flowControlLowThreshold) + { + _listener.underThreshold(_count); + } + } + } } |