diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java | 50 |
1 files changed, 38 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index 2493974d45..27a9e13617 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.ArrayList; import java.util.Collection; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -33,11 +31,11 @@ import java.util.concurrent.ConcurrentMap; public class DefaultQueueRegistry implements QueueRegistry { - private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class); - private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private final VirtualHost _virtualHost; + private final Collection<RegistryChangeListener> _listeners = + new ArrayList<RegistryChangeListener>(); public DefaultQueueRegistry(VirtualHost virtualHost) { @@ -52,11 +50,28 @@ public class DefaultQueueRegistry implements QueueRegistry public void registerQueue(AMQQueue queue) { _queueMap.put(queue.getNameShortString(), queue); + synchronized (_listeners) + { + for(RegistryChangeListener listener : _listeners) + { + listener.queueRegistered(queue); + } + } } public void unregisterQueue(AMQShortString name) { - _queueMap.remove(name); + AMQQueue q = _queueMap.remove(name); + if(q != null) + { + synchronized (_listeners) + { + for(RegistryChangeListener listener : _listeners) + { + listener.queueUnregistered(q); + } + } + } } public AMQQueue getQueue(AMQShortString name) @@ -79,19 +94,30 @@ public class DefaultQueueRegistry implements QueueRegistry return getQueue(new AMQShortString(queue)); } + public void addRegistryChangeListener(RegistryChangeListener listener) + { + synchronized(_listeners) + { + _listeners.add(listener); + } + } + @Override public void stopAllAndUnregisterMBeans() { for (final AMQQueue queue : getQueues()) { queue.stop(); - try - { - queue.getManagedObject().unregister(); - } - catch (AMQException e) + + //TODO: this is a bit of a hack, what if the listeners aren't aware + //that we are just unregistering the MBean because of HA, and aren't + //actually removing the queue as such. + synchronized (_listeners) { - LOGGER.warn("Failed to unregister mbean", e); + for(RegistryChangeListener listener : _listeners) + { + listener.queueUnregistered(queue); + } } } _queueMap.clear(); |