summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java62
1 files changed, 54 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index bf4184bf0b..07813b073b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -29,7 +29,9 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -46,6 +48,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
private Exchange _defaultExchange;
private VirtualHost _host;
+ private final Collection<RegistryChangeListener> _listeners =
+ Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>());
public DefaultExchangeRegistry(VirtualHost host)
{
@@ -68,6 +72,14 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
{
_exchangeMap.put(exchange.getNameShortString(), exchange);
_exchangeMapStr.put(exchange.getNameShortString().toString(), exchange);
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.exchangeRegistered(exchange);
+ }
+
+ }
}
public void setDefaultExchange(Exchange exchange)
@@ -114,6 +126,15 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
getDurableConfigurationStore().removeExchange(e);
}
e.close();
+
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.exchangeUnregistered(exchange);
+ }
+ }
+
}
else
{
@@ -126,6 +147,16 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
unregisterExchange(new AMQShortString(name), inUse);
}
+ public Collection<Exchange> getExchanges()
+ {
+ return new ArrayList<Exchange>(_exchangeMap.values());
+ }
+
+ public void addRegistryChangeListener(RegistryChangeListener listener)
+ {
+ _listeners.add(listener);
+ }
+
public Exchange getExchange(AMQShortString name)
{
if ((name == null) || name.length() == 0)
@@ -158,16 +189,14 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
{
final Exchange exchange = getExchange(exchangeName);
- if (exchange instanceof AbstractExchange)
+ //TODO: this is a bit of a hack, what if the listeners aren't aware
+ //that we are just unregistering the MBean because of HA, and aren't
+ //actually removing the exchange as such.
+ synchronized (_listeners)
{
- AbstractExchange abstractExchange = (AbstractExchange) exchange;
- try
+ for(RegistryChangeListener listener : _listeners)
{
- abstractExchange.getManagedObject().unregister();
- }
- catch (AMQException e)
- {
- LOGGER.warn("Failed to unregister mbean", e);
+ listener.exchangeUnregistered(exchange);
}
}
}
@@ -196,4 +225,21 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
}
+ public boolean isReservedExchangeName(String name)
+ {
+ if (name == null || "".equals(name) || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(name)
+ || name.startsWith("amq.") || name.startsWith("qpid."))
+ {
+ return true;
+ }
+ Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeFactory().getRegisteredTypes();
+ for (ExchangeType<? extends Exchange> type : registeredTypes)
+ {
+ if (type.getDefaultExchangeName().toString().equals(name))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
}