summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-26 23:44:39 +0000
committerKeith Wall <kwall@apache.org>2015-03-26 23:44:39 +0000
commitd9f4fdcab179600c43d86fd07949d1c5ecbb1767 (patch)
treed7263757a19b694e121bbd2eadb912e2ad8a9559
parent9cc2707d5306101dcf3fb371624fbc29323c00f6 (diff)
downloadqpid-python-d9f4fdcab179600c43d86fd07949d1c5ecbb1767.tar.gz
QPID-6466: [Java Client] Avoid possibilty that the dispatcher and IO thread can reject/release the same message during consumer close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1669472 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java59
2 files changed, 68 insertions, 59 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 6c43396481..0ca3505053 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -890,16 +890,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch(TransportException e)
{
- throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+ throw toJMSException("Session exception occurred while trying to commit: " + e.getMessage(), e);
}
}
protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
+
+
+
public void confirmConsumerCancelled(int consumerTag)
{
-
- // Remove the consumer from the map
C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
@@ -917,7 +918,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
startDispatcherIfNecessary(true);
}
- _dispatcher.rejectPending(consumer);
+ rejectPending(consumer);
}
else // Queue Browser
{
@@ -947,6 +948,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ private void rejectPending(C consumer)
+ {
+ // Reject messages on pre-receive queue
+ consumer.rollbackPendingMessages();
+
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag());
+
+ // closeConsumer
+ consumer.markClosed();
+ }
+
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
if (isStrictAMQP())
@@ -3077,19 +3090,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_producers.put(producerId, producer);
}
- /**
- * @param consumerTag The consumerTag to prune from queue or all if null
- * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
- * @param rejectAllConsumers
- */
-
- private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
+ private void rejectMessagesForConsumerTag(int consumerTag)
{
Iterator<Dispatchable> messages = _queue.iterator();
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
- + requeue);
+ _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ")");
if (messages.hasNext())
{
@@ -3104,21 +3110,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if (rejectAllConsumers || (message.getConsumerTag() == consumerTag))
+ if (message.getConsumerTag() == consumerTag)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
- + message.getDeliveryTag());
- }
- messages.remove();
+ if (_queue.remove(message))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
+ + message.getDeliveryTag());
+ }
- rejectMessage(message, requeue);
+ rejectMessage(message, true);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag);
+ }
}
}
}
@@ -3288,18 +3296,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return _closed;
}
- public void rejectPending(C consumer)
- {
- // Reject messages on pre-receive queue
- consumer.rollbackPendingMessages();
-
- // Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
-
- // closeConsumer
- consumer.markClosed();
-
- }
public void rollback()
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index df54b7066b..512f6a3b4a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -59,6 +59,7 @@ public class FlowControllingBlockingQueue<T>
return _queue.isEmpty();
}
+
public interface ThresholdListener
{
void aboveThreshold(int currentValue);
@@ -104,14 +105,7 @@ public class FlowControllingBlockingQueue<T>
if (o != null && !disableFlowControl && _listener != null)
{
- synchronized (_listener)
- {
- if (_count-- == _flowControlLowThreshold)
- {
- _listener.underThreshold(_count);
- }
- }
-
+ reportBelowIfNecessary();
}
return o;
@@ -132,14 +126,7 @@ public class FlowControllingBlockingQueue<T>
}
if (!disableFlowControl && _listener != null)
{
- synchronized (_listener)
- {
- if (_count-- == _flowControlLowThreshold)
- {
- _listener.underThreshold(_count);
- }
- }
-
+ reportBelowIfNecessary();
}
return o;
@@ -155,18 +142,44 @@ public class FlowControllingBlockingQueue<T>
}
if (!disableFlowControl && _listener != null)
{
- synchronized (_listener)
- {
- if (++_count == _flowControlHighThreshold)
- {
- _listener.aboveThreshold(_count);
- }
- }
+ reportAboveIfNecessary();
}
}
+ public boolean remove(final T o)
+ {
+ final boolean removed = _queue.remove(o);
+ if (removed && !disableFlowControl && _listener != null)
+ {
+ reportBelowIfNecessary();
+ }
+ return removed;
+ }
+
public Iterator<T> iterator()
{
return _queue.iterator();
}
+
+ private void reportAboveIfNecessary()
+ {
+ synchronized (_listener)
+ {
+ if (++_count == _flowControlHighThreshold)
+ {
+ _listener.aboveThreshold(_count);
+ }
+ }
+ }
+
+ private void reportBelowIfNecessary()
+ {
+ synchronized (_listener)
+ {
+ if (_count-- == _flowControlLowThreshold)
+ {
+ _listener.underThreshold(_count);
+ }
+ }
+ }
}