diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 246e056f0b..7a3367d215 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.exchange; +import java.util.ArrayList; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.binding.Binding; @@ -47,6 +49,7 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractExchange implements Exchange { + private static final Logger _logger = Logger.getLogger(AbstractExchange.class); private AMQShortString _name; private final AtomicBoolean _closed = new AtomicBoolean(); @@ -295,7 +298,29 @@ public abstract class AbstractExchange implements Exchange { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); - final List<? extends BaseQueue> queues = doRoute(message); + List<? extends BaseQueue> queues = doRoute(message); + List<? extends BaseQueue> allQueues = queues; + + boolean deletedQueues = false; + + for(BaseQueue q : allQueues) + { + if(q.isDeleted()) + { + if(!deletedQueues) + { + deletedQueues = true; + queues = new ArrayList<BaseQueue>(allQueues); + } + if(_logger.isDebugEnabled()) + { + _logger.debug("Exchange: " + getName() + " - attempt to enqueue message onto deleted queue " + String.valueOf(q.getNameShortString())); + } + queues.remove(q); + } + } + + if(!queues.isEmpty()) { _routedMessageCount.incrementAndGet(); |