diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-05 15:30:53 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-05 15:30:53 +0000 |
commit | 77171f498c5c7dca09448ce8168c3bd30bfe3825 (patch) | |
tree | c03d565b4cd8e9ff847f2599d5abfa84c6886ccf | |
parent | 602ae3b5f7cc7dbe429532466237e2e943ae2059 (diff) | |
download | qpid-python-77171f498c5c7dca09448ce8168c3bd30bfe3825.tar.gz |
QPID-4973 : [Java Broker] Refactor DurableConfigurationStore interface to be in terms of ConfiguredObject rather than implementation classes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1500047 13f79535-47bb-0310-9956-ffa450edef68
49 files changed, 865 insertions, 512 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 8cf1ad8a83..be91e4a484 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -70,7 +70,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT); _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory()); _messageStore.configureConfigStore(getName(), recoveryHandler, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 3f6489cb86..53dd6df599 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -63,7 +63,6 @@ public abstract class AbstractExchange implements Exchange private Exchange _alternateExchange; private boolean _durable; - private int _ticket; private VirtualHost _virtualHost; @@ -109,14 +108,17 @@ public abstract class AbstractExchange implements Exchange return _type.getName(); } - public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + public void initialise(UUID id, + VirtualHost host, + AMQShortString name, + boolean durable, + boolean autoDelete) throws AMQException { _virtualHost = host; _name = name; _durable = durable; _autoDelete = autoDelete; - _ticket = ticket; _id = id; _logSubject = new ExchangeLogSubject(this, this.getVirtualHost()); @@ -135,11 +137,6 @@ public abstract class AbstractExchange implements Exchange return _autoDelete; } - public int getTicket() - { - return _ticket; - } - public void close() throws AMQException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index dad6e60bfe..2873eb31e8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -49,7 +49,6 @@ public class DefaultExchange implements Exchange private UUID _id; private VirtualHost _virtualHost; - private int _ticket; private static final Logger _logger = Logger.getLogger(DefaultExchange.class); private final AtomicBoolean _closed = new AtomicBoolean(); @@ -62,12 +61,10 @@ public class DefaultExchange implements Exchange VirtualHost host, AMQShortString name, boolean durable, - int ticket, boolean autoDelete) throws AMQException { _id = id; _virtualHost = host; - _ticket = ticket; } @Override @@ -197,12 +194,6 @@ public class DefaultExchange implements Exchange } @Override - public int getTicket() - { - return _ticket; - } - - @Override public void close() throws AMQException { if(_closed.compareAndSet(false,true)) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 5e6e36d330..a0b80a601c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -102,51 +102,45 @@ public class DefaultExchangeFactory implements ExchangeFactory public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes() { - Collection<ExchangeType<? extends Exchange>> publicTypes = + Collection<ExchangeType<? extends Exchange>> publicTypes = new ArrayList<ExchangeType<? extends Exchange>>(); publicTypes.addAll(_exchangeClassMap.values()); - + return publicTypes; } public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) - throws AMQException + throws AMQException { - return createExchange(new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0); - } - public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) - throws AMQException - { - return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0); + UUID id = UUIDGenerator.generateExchangeUUID(exchange, _host.getName()); + return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete); } - public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, - boolean autoDelete, int ticket) + public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQException { - UUID id = UUIDGenerator.generateExchangeUUID(exchange.asString(), _host.getName()); - return createExchange(id, exchange, type, durable, autoDelete, ticket); + return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete); } - public Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable, - boolean autoDelete, int ticket) + private Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable, + boolean autoDelete) throws AMQException { // Check access if (!_host.getSecurityManager().authoriseCreateExchange(autoDelete, durable, exchange, null, null, null, type)) { - String description = "Permission denied: exchange-name '" + exchange.asString() + "'"; + String description = "Permission denied: exchange-name '" + exchange + "'"; throw new AMQSecurityException(description); } - + ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type); if (exchType == null) { throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null); } - - Exchange e = exchType.newInstance(id, _host, exchange, durable, ticket, autoDelete); + + Exchange e = exchType.newInstance(id, _host, exchange, durable, autoDelete); return e; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 68c15779a0..75c489c731 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -24,12 +24,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; @@ -46,8 +44,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry /** * Maps from exchange name to exchange instance */ - private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>(); - private ConcurrentMap<String, Exchange> _exchangeMapStr = new ConcurrentHashMap<String, Exchange>(); + private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>(); private Exchange _defaultExchange; private VirtualHost _host; @@ -59,17 +56,17 @@ public class DefaultExchangeRegistry implements ExchangeRegistry _host = host; } - public void initialise() throws AMQException + public void initialise(ExchangeFactory exchangeFactory) throws AMQException { //create 'standard' exchanges: - new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore()); + new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore()); _defaultExchange = new DefaultExchange(); UUID defaultExchangeId = UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName()); - _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false,0,false); + _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false, false); } @@ -80,8 +77,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) throws AMQException { - _exchangeMap.put(exchange.getNameShortString(), exchange); - _exchangeMapStr.put(exchange.getNameShortString().toString(), exchange); + _exchangeMap.put(exchange.getNameShortString().toString(), exchange); synchronized (_listeners) { for(RegistryChangeListener listener : _listeners) @@ -102,12 +98,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry return _defaultExchange; } - public Collection<AMQShortString> getExchangeNames() - { - return _exchangeMap.keySet(); - } - - public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException + public void unregisterExchange(String name, boolean inUse) throws AMQException { final Exchange exchange = _exchangeMap.get(name); if (exchange == null) @@ -123,13 +114,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry // TODO: check inUse argument Exchange e = _exchangeMap.remove(name); - _exchangeMapStr.remove(name.toString()); if (e != null) { - if (e.isDurable()) - { - DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), e); - } e.close(); synchronized (_listeners) @@ -147,11 +133,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } } - public void unregisterExchange(String name, boolean inUse) throws AMQException - { - unregisterExchange(new AMQShortString(name), inUse); - } - public Collection<Exchange> getExchanges() { return new ArrayList<Exchange>(_exchangeMap.values()); @@ -162,19 +143,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry _listeners.add(listener); } - public Exchange getExchange(AMQShortString name) - { - if ((name == null) || name.length() == 0) - { - return getDefaultExchange(); - } - else - { - return _exchangeMap.get(name); - } - - } - public Exchange getExchange(String name) { if ((name == null) || name.length() == 0) @@ -183,17 +151,15 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } else { - return _exchangeMapStr.get(name); + return _exchangeMap.get(name); } } @Override public void clearAndUnregisterMbeans() { - for (final AMQShortString exchangeName : getExchangeNames()) + for (final Exchange exchange : getExchanges()) { - final Exchange exchange = getExchange(exchangeName); - //TODO: this is a bit of a hack, what if the listeners aren't aware //that we are just unregistering the MBean because of HA, and aren't //actually removing the exchange as such. @@ -206,7 +172,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } } _exchangeMap.clear(); - _exchangeMapStr.clear(); } @Override @@ -237,7 +202,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry { return true; } - Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeFactory().getRegisteredTypes(); + Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeTypes(); for (ExchangeType<? extends Exchange> type : registeredTypes) { if (type.getDefaultExchangeName().toString().equals(name)) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java index 096d5265ed..c193764edc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java @@ -36,13 +36,12 @@ public class DirectExchangeType implements ExchangeType<DirectExchange> } public DirectExchange newInstance(UUID id, VirtualHost host, - AMQShortString name, - boolean durable, - int ticket, - boolean autoDelete) throws AMQException + AMQShortString name, + boolean durable, + boolean autoDelete) throws AMQException { DirectExchange exch = new DirectExchange(); - exch.initialise(id, host,name,durable,ticket,autoDelete); + exch.initialise(id, host,name,durable, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index b632c68ace..735072cc82 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -39,7 +39,7 @@ import java.util.UUID; public interface Exchange extends ExchangeReferrer { - void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete) throws AMQException; @@ -60,8 +60,6 @@ public interface Exchange extends ExchangeReferrer */ boolean isAutoDelete(); - int getTicket(); - Exchange getAlternateExchange(); void setAlternateExchange(Exchange exchange); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index e602d476d9..3ccfa51499 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,19 +30,13 @@ import java.util.UUID; public interface ExchangeFactory { - Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, - int ticket) - throws AMQException; Collection<ExchangeType<? extends Exchange>> getRegisteredTypes(); - + Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes(); Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException; Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQException; - Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable, - boolean autoDelete, int ticket) - throws AMQException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java index 10a733546c..fd7c6a7fe0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java @@ -33,17 +33,17 @@ public class ExchangeInitialiser { for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes()) { - define (registry, factory, type.getDefaultExchangeName(), type.getName(), store); + define (registry, factory, type.getDefaultExchangeName().toString(), type.getName().toString(), store); } } private void define(ExchangeRegistry r, ExchangeFactory f, - AMQShortString name, AMQShortString type, DurableConfigurationStore store) throws AMQException + String name, String type, DurableConfigurationStore store) throws AMQException { if(r.getExchange(name)== null) { - Exchange exchange = f.createExchange(name, type, true, false, 0); + Exchange exchange = f.createExchange(name, type, true, false); r.registerExchange(exchange); if(exchange.isDurable()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index 4dcedb4797..53ee7de661 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -31,27 +31,19 @@ public interface ExchangeRegistry { void registerExchange(Exchange exchange) throws AMQException; - /** - * Unregister an exchange - * @param name name of the exchange to delete - * @param inUse if true, do NOT delete the exchange if it is in use (has queues bound to it) - * @throws ExchangeInUseException when the exchange cannot be deleted because it is in use - * @throws AMQException - */ - void unregisterExchange(AMQShortString name, boolean inUse) throws ExchangeInUseException, AMQException; - - Exchange getExchange(AMQShortString name); - - void setDefaultExchange(Exchange exchange); - Exchange getDefaultExchange(); - Collection<AMQShortString> getExchangeNames(); - - void initialise() throws AMQException; + void initialise(ExchangeFactory exchangeFactory) throws AMQException; Exchange getExchange(String exchangeName); + /** + * Unregister an exchange + * @param exchange name of the exchange to delete + * @param ifUnused if true, do NOT delete the exchange if it is in use (has queues bound to it) + * @throws ExchangeInUseException when the exchange cannot be deleted because it is in use + * @throws AMQException + */ void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException; void clearAndUnregisterMbeans(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java index 0371a363de..587761b64e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java @@ -36,11 +36,11 @@ public class FanoutExchangeType implements ExchangeType<FanoutExchange> } public FanoutExchange newInstance(UUID id, VirtualHost host, AMQShortString name, - boolean durable, int ticket, boolean autoDelete) + boolean durable, boolean autoDelete) throws AMQException { FanoutExchange exch = new FanoutExchange(); - exch.initialise(id, host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java index ed4d57d0f8..1c99fbb364 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java @@ -35,12 +35,12 @@ public class HeadersExchangeType implements ExchangeType<HeadersExchange> return ExchangeDefaults.HEADERS_EXCHANGE_CLASS; } - public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, - boolean autoDelete) throws AMQException + public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, + boolean autoDelete) throws AMQException { HeadersExchange exch = new HeadersExchange(); - exch.initialise(id, host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java index 25a3549e61..d921901f0f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java @@ -36,13 +36,12 @@ public class TopicExchangeType implements ExchangeType<TopicExchange> } public TopicExchange newInstance(UUID id, VirtualHost host, - AMQShortString name, - boolean durable, - int ticket, - boolean autoDelete) throws AMQException + AMQShortString name, + boolean durable, + boolean autoDelete) throws AMQException { TopicExchange exch = new TopicExchange(); - exch.initialise(id, host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index cb8918e847..85fee94143 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -67,7 +67,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic } VirtualHost vHost = session.getVirtualHost(); - Exchange exch = vHost.getExchangeRegistry().getExchange(exchangeName); + Exchange exch = vHost.getExchange(exchangeName.toString()); // if the exchange does not exist we raise a channel exception if (exch == null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 2e6a98d81b..8493e97215 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -88,7 +88,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { throw new AMQException("Exchange exchange must not be null"); } - Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); + Exchange exchange = virtualHost.getExchange(exchangeName.toString()); ExchangeBoundOkBody response; if (exchange == null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index b3967689dc..437869ab01 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -32,12 +32,11 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.store.DurableConfigurationStoreHelper; +import org.apache.qpid.server.virtualhost.ExchangeExistsException; +import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.VirtualHost; public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody> @@ -59,8 +58,6 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); final AMQChannel channel = session.getChannel(channelId); if (channel == null) { @@ -73,57 +70,63 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName); } - synchronized(exchangeRegistry) + Exchange exchange; + + if (body.getPassive()) { - Exchange exchange = exchangeRegistry.getExchange(exchangeName); + exchange = virtualHost.getExchange(exchangeName == null ? null : exchangeName.toString()); + if(exchange == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); + } + else if (!exchange.getTypeShortString().equals(body.getType()) && !(body.getType() == null || body.getType().length() ==0)) + { + + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + exchange.getTypeShortString() + + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null); + } - if (exchange == null) + } + else + { + try { - if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0)) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); - } - else if(exchangeName.startsWith("amq.")) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix 'amq.'."); - } - else if(exchangeName.startsWith("qpid.")) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix 'qpid.'."); - } - else + exchange = virtualHost.createExchange(null, + exchangeName == null ? null : exchangeName.intern().toString(), + body.getType() == null ? null : body.getType().intern().toString(), + body.getDurable(), + body.getAutoDelete(), + null); + + } + catch(ReservedExchangeNameException e) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Attempt to declare exchange: " + exchangeName + + " which begins with reserved prefix."); + + } + catch(ExchangeExistsException e) + { + exchange = e.getExistingExchange(); + if(!exchange.getTypeShortString().equals(body.getType())) { - try - { - exchange = exchangeFactory.createExchange(exchangeName == null ? null : exchangeName.intern(), - body.getType() == null ? null : body.getType().intern(), - body.getDurable(), - body.getAutoDelete(), body.getTicket()); - exchangeRegistry.registerExchange(exchange); - - if (exchange.isDurable()) - { - DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(), - exchange); - } - } - catch(AMQUnknownExchangeType e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e); - } + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + + exchange.getTypeShortString() + + " to " + body.getType() +".", + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); } } - else if (!exchange.getTypeShortString().equals(body.getType()) && !((body.getType() == null || body.getType().length() ==0) && body.getPassive())) + catch(AMQUnknownExchangeType e) { - - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + - exchangeName + " of type " + exchange.getTypeShortString() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e); } } + + if(!body.getNowait()) { MethodRegistry methodRegistry = session.getMethodRegistry(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 339085691f..25d0182202 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,16 +21,21 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.ExecutionErrorCode; public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody> { @@ -49,7 +54,6 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD { AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); final AMQChannel channel = session.getChannel(channelId); if (channel == null) { @@ -58,14 +62,18 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD channel.sync(); try { - if(exchangeRegistry.getExchange(body.getExchange()) == null) + final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); + + final Exchange exchange = virtualHost.getExchange(exchangeName); + if(exchange == null) { throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange()); } - exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused()); + + virtualHost.removeExchange(exchange, !body.getIfUnused()); ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody(); - + session.writeFrame(responseBody.generateFrame(channelId)); } catch (ExchangeInUseException e) @@ -73,5 +81,15 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use"); // TODO: sort out consistent channel close mechanism that does all clean up etc. } + + catch (ExchangeIsAlternateException e) + { + throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange"); + + } + catch (RequiredExchangeException e) + { + throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted"); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 0501443efa..63b8bf3136 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -32,7 +33,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -62,7 +62,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); AMQChannel channel = protocolConnection.getChannel(channelId); @@ -103,10 +102,11 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); } - final Exchange exch = exchangeRegistry.getExchange(body.getExchange()); + final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); + final Exchange exch = virtualHost.getExchange(exchangeName); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist."); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 521f27885f..035bf25e04 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -62,7 +62,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar final AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); final AMQSessionModel session = protocolConnection.getChannel(channelId); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java index 523c7acd88..e0e814537a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,16 +7,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ package org.apache.qpid.server.handler; @@ -60,7 +60,6 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); @@ -97,7 +96,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); } - final Exchange exch = exchangeRegistry.getExchange(body.getExchange()); + final Exchange exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString()); if (exch == null) { throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); @@ -112,7 +111,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB exch.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments())); } - + if (_log.isInfoEnabled()) { _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java index 5d3d507fff..07083fc661 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java @@ -42,6 +42,8 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apache.qpid.server.exchange.Exchange.BindingListener @@ -121,7 +123,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa return createBinding(bindingKey, queue, bindingArgs, attributes); } - + public org.apache.qpid.server.model.Binding createBinding(String bindingKey, Queue queue, Map<String, Object> bindingArguments, Map<String, Object> attributes) @@ -165,21 +167,11 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa { try { - ExchangeRegistry exchangeRegistry = _vhost.getVirtualHost().getExchangeRegistry(); - if (exchangeRegistry.isReservedExchangeName(getName())) - { - throw new UnsupportedOperationException("'" + getName() + "' is a reserved exchange and can't be deleted"); - } - - if(_exchange.hasReferrers()) - { - throw new AMQException( AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", null); - } - - synchronized(exchangeRegistry) - { - exchangeRegistry.unregisterExchange(getName(), false); - } + _vhost.getVirtualHost().removeExchange(_exchange, true); + } + catch(RequiredExchangeException e) + { + throw new UnsupportedOperationException("'" + getName() + "' is a reserved exchange and can't be deleted"); } catch(AMQException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 74b826da91..c09dd9449e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -81,11 +81,13 @@ import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.plugin.VirtualHostFactory; +import org.apache.qpid.server.virtualhost.ExchangeExistsException; +import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; +import org.apache.qpid.server.virtualhost.VirtualHostListener; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener, - QueueRegistry.RegistryChangeListener, - IConnectionRegistry.RegistryChangeListener +public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener { private static final Logger LOGGER = Logger.getLogger(VirtualHostAdapter.class); @@ -184,7 +186,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual private void populateExchanges() { Collection<org.apache.qpid.server.exchange.Exchange> actualExchanges = - _virtualHost.getExchangeRegistry().getExchanges(); + _virtualHost.getExchanges(); synchronized (_exchangeAdapters) { @@ -296,31 +298,81 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual try { - ExchangeRegistry exchangeRegistry = _virtualHost.getExchangeRegistry(); - if (exchangeRegistry.isReservedExchangeName(name)) + String alternateExchange = null; + if(attributes.containsKey(Exchange.ALTERNATE_EXCHANGE)) { - throw new UnsupportedOperationException("'" + name + "' is a reserved exchange name"); - } - synchronized(exchangeRegistry) - { - org.apache.qpid.server.exchange.Exchange exchange = exchangeRegistry.getExchange(name); - if (exchange != null) + Object altExchangeObject = attributes.get(Exchange.ALTERNATE_EXCHANGE); + if(altExchangeObject instanceof Exchange) { - throw new IllegalArgumentException("Exchange with name '" + name + "' already exists"); + alternateExchange = ((Exchange) altExchangeObject).getName(); } - exchange = _virtualHost.getExchangeFactory().createExchange(name, type, durable, - lifetime == LifetimePolicy.AUTO_DELETE); - _virtualHost.getExchangeRegistry().registerExchange(exchange); - if(durable) + else if(altExchangeObject instanceof UUID) { - DurableConfigurationStoreHelper.createExchange(_virtualHost.getDurableConfigurationStore(), - exchange); + for(Exchange ex : getExchanges()) + { + if(altExchangeObject.equals(ex.getId())) + { + alternateExchange = ex.getName(); + break; + } + } } - synchronized (_exchangeAdapters) + else if(altExchangeObject instanceof String) { - return _exchangeAdapters.get(exchange); + + for(Exchange ex : getExchanges()) + { + if(altExchangeObject.equals(ex.getName())) + { + alternateExchange = ex.getName(); + break; + } + } + if(alternateExchange == null) + { + try + { + UUID id = UUID.fromString(altExchangeObject.toString()); + for(Exchange ex : getExchanges()) + { + if(id.equals(ex.getId())) + { + alternateExchange = ex.getName(); + break; + } + } + } + catch(IllegalArgumentException e) + { + // ignore + } + + } } } + org.apache.qpid.server.exchange.Exchange exchange = _virtualHost.createExchange(null, + name, + type, + durable, + lifetime == LifetimePolicy.AUTO_DELETE, + alternateExchange); + synchronized (_exchangeAdapters) + { + return _exchangeAdapters.get(exchange); + } + + } + catch(ExchangeExistsException e) + { + throw new IllegalArgumentException("Exchange with name '" + name + "' already exists"); + } + catch(ReservedExchangeNameException e) + { + throw new UnsupportedOperationException("'" + name + "' is a reserved exchange name"); + } + catch(UnknownExchangeException e) + { + throw new IllegalArgumentException("Alternate Exchange with name '" + e.getExchangeName() + "' does not exist"); } catch(AMQException e) { @@ -726,7 +778,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual public Collection<String> getExchangeTypes() { Collection<ExchangeType<? extends org.apache.qpid.server.exchange.Exchange>> types = - _virtualHost.getExchangeFactory().getRegisteredTypes(); + _virtualHost.getExchangeTypes(); Collection<String> exchangeTypes = new ArrayList<String>(); @@ -884,7 +936,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual if(SUPPORTED_EXCHANGE_TYPES.equals(name)) { List<String> types = new ArrayList<String>(); - for(@SuppressWarnings("rawtypes") ExchangeType type : _virtualHost.getExchangeFactory().getRegisteredTypes()) + for(@SuppressWarnings("rawtypes") ExchangeType type : _virtualHost.getExchangeTypes()) { types.add(type.getName().asString()); } @@ -1009,7 +1061,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } else if(VirtualHost.EXCHANGE_COUNT.equals(name)) { - return _vhost.getExchangeRegistry().getExchanges().size(); + return _vhost.getExchanges().size(); } else if(VirtualHost.CONNECTION_COUNT.equals(name)) { @@ -1127,11 +1179,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual virtualHostRegistry.registerVirtualHost(_virtualHost); _statistics = new VirtualHostStatisticsAdapter(_virtualHost); - _virtualHost.getQueueRegistry().addRegistryChangeListener(this); + _virtualHost.addVirtualHostListener(this); populateQueues(); - _virtualHost.getExchangeRegistry().addRegistryChangeListener(this); populateExchanges(); - _virtualHost.getConnectionRegistry().addRegistryChangeListener(this); synchronized(_aliases) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java index 72d14456ed..0c14c06624 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java @@ -102,7 +102,7 @@ public class HeaderPropertiesConverter exchangeName = ""; } - Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); + Exchange exchange = vhost.getExchange(exchangeName); String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString(); props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'")); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java index 40ef6ad6a2..7bd0728850 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java @@ -31,6 +31,6 @@ public interface ExchangeType<T extends Exchange> { public AMQShortString getName(); public T newInstance(UUID id, VirtualHost host, AMQShortString name, - boolean durable, int ticket, boolean autoDelete) throws AMQException; + boolean durable, boolean autoDelete) throws AMQException; public AMQShortString getDefaultExchangeName(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index a72671762c..ca67b6f79b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -215,7 +215,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS List<Binding> bindingsToRemove = new ArrayList<Binding>(); for(Binding existingBinding : bindings) { - if(existingBinding.getExchange() != _vhost.getExchangeRegistry().getDefaultExchange() + if(existingBinding.getExchange() != _vhost.getDefaultExchange() && existingBinding.getExchange() != exchange) { bindingsToRemove.add(existingBinding); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 2b39fb1ff0..ce6ef0b183 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -115,7 +115,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } else { - Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr); + Exchange exchg = _vhost.getExchange(addr); if(exchg != null) { destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy()); @@ -244,7 +244,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } String addr = target.getAddress(); - Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr); + Exchange exchg = _vhost.getExchange(addr); if(exchg != null) { destination = new ExchangeDestination(exchg, target.getDurable(), diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index f9e678525f..1eeb6dccf3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; +import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.VirtualHost; public class AMQQueueFactory @@ -300,25 +301,22 @@ public class AMQQueueFactory final String dlExchangeName = getDeadLetterExchangeName(queueName); final String dlQueueName = getDeadLetterQueueName(queueName); - final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); final QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); Exchange dlExchange = null; - synchronized(exchangeRegistry) - { - dlExchange = exchangeRegistry.getExchange(dlExchangeName); - - if(dlExchange == null) - { - dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0); - - exchangeRegistry.registerExchange(dlExchange); + final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName()); - //enter the dle in the persistent store - DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(), - dlExchange); - } + try + { + dlExchange = virtualHost.createExchange(dlExchangeId, + dlExchangeName, + ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(), + true, false, null); + } + catch(ExchangeExistsException e) + { + // We're ok if the exchange already exists + dlExchange = e.getExistingExchange(); } AMQQueue dlQueue = null; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 889fe7c5c1..b7ac86795d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -63,6 +63,11 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.virtualhost.ExchangeExistsException; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; +import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; @@ -185,7 +190,7 @@ public class ServerSessionDelegate extends SessionDelegate if(!method.hasQueue()) { - exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied"); + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied"); } else { @@ -684,7 +689,6 @@ public class ServerSessionDelegate extends SessionDelegate { String exchangeName = method.getExchange(); VirtualHost virtualHost = getVirtualHost(session); - ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); //we must check for any unsupported arguments present and throw not-implemented if(method.hasArguments()) @@ -697,114 +701,79 @@ public class ServerSessionDelegate extends SessionDelegate return; } } - synchronized(exchangeRegistry) + + if(method.getPassive()) { Exchange exchange = getExchange(session, exchangeName); - if(method.getPassive()) + if(exchange == null) { - if(exchange == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'"); - } - else + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'"); + } + else + { + if (!exchange.getTypeShortString().toString().equals(method.getType()) + && (method.getType() != null && method.getType().length() > 0)) { - if (!exchange.getTypeShortString().toString().equals(method.getType()) - && (method.getType() != null && method.getType().length() > 0)) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + "."); - } + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + "."); } } - else + } + else + { + + try { - if (exchange == null) + virtualHost.createExchange(null, + method.getExchange(), + method.getType(), + method.getDurable(), + method.getAutoDelete(), + method.getAlternateExchange()); + } + catch(ReservedExchangeNameException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " + + exchangeName + " which begins with reserved name or prefix."); + } + catch(UnknownExchangeException e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, + "Unknown alternate exchange " + e.getExchangeName()); + } + catch(AMQUnknownExchangeType e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); + } + catch(ExchangeExistsException e) + { + Exchange exchange = e.getExistingExchange(); + if(!exchange.getTypeShortString().toString().equals(method.getType())) { - if (exchangeName.startsWith("amq.")) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " - + exchangeName + " which begins with reserved prefix 'amq.'."); - } - else if (exchangeName.startsWith("qpid.")) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " - + exchangeName + " which begins with reserved prefix 'qpid.'."); - } - else - { - ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); - try - { - exchange = exchangeFactory.createExchange(method.getExchange(), - method.getType(), - method.getDurable(), - method.getAutoDelete()); - String alternateExchangeName = method.getAlternateExchange(); - boolean validAlternate; - if(alternateExchangeName != null && alternateExchangeName.length() != 0) - { - Exchange alternate = getExchange(session, alternateExchangeName); - if(alternate == null) - { - validAlternate = false; - } - else - { - exchange.setAlternateExchange(alternate); - validAlternate = true; - } - } - else - { - validAlternate = true; - } - if(validAlternate) - { - if (exchange.isDurable()) - { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - DurableConfigurationStoreHelper.createExchange(store, exchange); - } - exchangeRegistry.registerExchange(exchange); - } - else - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, - "Unknown alternate exchange " + alternateExchangeName); - } - } - catch(AMQUnknownExchangeType e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot declare exchange '" + exchangeName); - } - } + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to redeclare exchange: " + exchangeName + + " of type " + exchange.getTypeShortString() + + " to " + method.getType() +"."); } - else + else if(method.hasAlternateExchange() + && (exchange.getAlternateExchange() == null || + !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) { - if(!exchange.getTypeShortString().toString().equals(method.getType())) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to redeclare exchange: " + exchangeName - + " of type " + exchange.getTypeShortString() - + " to " + method.getType() +"."); - } - else if(method.hasAlternateExchange() - && (exchange.getAlternateExchange() == null || - !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to change alternate exchange of: " + exchangeName - + " from " + exchange.getAlternateExchange() - + " to " + method.getAlternateExchange() +"."); - } + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to change alternate exchange of: " + exchangeName + + " from " + exchange.getAlternateExchange() + + " to " + method.getAlternateExchange() +"."); } } + catch (AMQException e) + { + exception(session, method, e, "Cannot declare exchange '" + exchangeName); + } + + } + } // TODO decouple AMQException and AMQConstant error codes @@ -841,32 +810,25 @@ public class ServerSessionDelegate extends SessionDelegate private Exchange getExchange(Session session, String exchangeName) { - ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); - return exchangeRegistry.getExchange(exchangeName); - } - - private ExchangeRegistry getExchangeRegistry(Session session) - { - VirtualHost virtualHost = getVirtualHost(session); - return virtualHost.getExchangeRegistry(); - + return getVirtualHost(session).getExchange(exchangeName); } private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr) { - final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn); + VirtualHost virtualHost = getVirtualHost(ssn); + Exchange exchange; if(xfr.hasDestination()) { - exchange = exchangeRegistry.getExchange(xfr.getDestination()); + exchange = virtualHost.getExchange(xfr.getDestination()); if(exchange == null) { - exchange = exchangeRegistry.getDefaultExchange(); + exchange = virtualHost.getDefaultExchange(); } } else { - exchange = exchangeRegistry.getDefaultExchange(); + exchange = virtualHost.getDefaultExchange(); } return exchange; } @@ -888,7 +850,6 @@ public class ServerSessionDelegate extends SessionDelegate public void exchangeDelete(Session session, ExchangeDelete method) { VirtualHost virtualHost = getVirtualHost(session); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); try { @@ -904,29 +865,23 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.NOT_FOUND, "No such exchange '" + method.getExchange() + "'"); } - else if(exchange.hasReferrers()) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange"); - } - else if(isStandardExchange(exchange, virtualHost.getExchangeFactory().getRegisteredTypes())) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted"); - } else { - exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused()); - - if (exchange.isDurable() && !exchange.isAutoDelete()) - { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - DurableConfigurationStoreHelper.removeExchange(store, exchange); - } + virtualHost.removeExchange(exchange, !method.getIfUnused()); } } catch (ExchangeInUseException e) { exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use"); } + catch (ExchangeIsAlternateException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange"); + } + catch (RequiredExchangeException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted"); + } catch (AMQException e) { exception(session, method, e, "Cannot delete exchange '" + method.getExchange() ); @@ -982,7 +937,6 @@ public class ServerSessionDelegate extends SessionDelegate { VirtualHost virtualHost = getVirtualHost(session); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); if (!method.hasQueue()) @@ -1002,7 +956,7 @@ public class ServerSessionDelegate extends SessionDelegate method.setBindingKey(method.getQueue()); } AMQQueue queue = queueRegistry.getQueue(method.getQueue()); - Exchange exchange = exchangeRegistry.getExchange(method.getExchange()); + Exchange exchange = virtualHost.getExchange(method.getExchange()); if(queue == null) { exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); @@ -1045,7 +999,6 @@ public class ServerSessionDelegate extends SessionDelegate public void exchangeUnbind(Session session, ExchangeUnbind method) { VirtualHost virtualHost = getVirtualHost(session); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); if (!method.hasQueue()) @@ -1063,7 +1016,7 @@ public class ServerSessionDelegate extends SessionDelegate else { AMQQueue queue = queueRegistry.getQueue(method.getQueue()); - Exchange exchange = exchangeRegistry.getExchange(method.getExchange()); + Exchange exchange = virtualHost.getExchange(method.getExchange()); if(queue == null) { exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); @@ -1091,11 +1044,12 @@ public class ServerSessionDelegate extends SessionDelegate { ExchangeBoundResult result = new ExchangeBoundResult(); + VirtualHost virtualHost = getVirtualHost(session); Exchange exchange; AMQQueue queue; if(method.hasExchange()) { - exchange = getExchange(session, method.getExchange()); + exchange = virtualHost.getExchange(method.getExchange()); if(exchange == null) { @@ -1104,7 +1058,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - exchange = getExchangeRegistry(session).getDefaultExchange(); + exchange = virtualHost.getDefaultExchange(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index e89fa8b545..15f5becec2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,7 +34,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -42,10 +43,12 @@ import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v1_0.LinkRegistry; @@ -56,6 +59,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; @@ -259,7 +263,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { _logger.debug("Loading configuration for virtualhost: " + config.getName()); - _exchangeRegistry.initialise(); + _exchangeRegistry.initialise(_exchangeFactory); List<String> exchangeNames = config.getExchanges(); @@ -279,25 +283,18 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException { - AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName()); - - Exchange exchange; - exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) + boolean durable = exchangeConfiguration.getDurable(); + boolean autodelete = exchangeConfiguration.getAutoDelete(); + try { - - AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); - boolean durable = exchangeConfiguration.getDurable(); - boolean autodelete = exchangeConfiguration.getAutoDelete(); - - Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0); - _exchangeRegistry.registerExchange(newExchange); - - if (newExchange.isDurable()) - { - DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), newExchange); - } + Exchange newExchange = createExchange(null, exchangeConfiguration.getName(), exchangeConfiguration.getType(), durable, autodelete, + null); } + catch(ExchangeExistsException e) + { + _logger.info("Exchange " + exchangeConfiguration.getName() + " already defined. Configuration in XML file ignored"); + } + } private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException @@ -374,16 +371,162 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg return _queueRegistry; } - public ExchangeRegistry getExchangeRegistry() + protected ExchangeRegistry getExchangeRegistry() { return _exchangeRegistry; } - public ExchangeFactory getExchangeFactory() + protected ExchangeFactory getExchangeFactory() { return _exchangeFactory; } + @Override + public void addVirtualHostListener(final VirtualHostListener listener) + { + _exchangeRegistry.addRegistryChangeListener(new ExchangeRegistry.RegistryChangeListener() + { + @Override + public void exchangeRegistered(Exchange exchange) + { + listener.exchangeRegistered(exchange); + } + + @Override + public void exchangeUnregistered(Exchange exchange) + { + listener.exchangeUnregistered(exchange); + } + }); + _queueRegistry.addRegistryChangeListener(new QueueRegistry.RegistryChangeListener() + { + @Override + public void queueRegistered(AMQQueue queue) + { + listener.queueRegistered(queue); + } + + @Override + public void queueUnregistered(AMQQueue queue) + { + listener.queueUnregistered(queue); + } + }); + _connectionRegistry.addRegistryChangeListener(new IConnectionRegistry.RegistryChangeListener() + { + @Override + public void connectionRegistered(AMQConnectionModel connection) + { + listener.connectionRegistered(connection); + } + + @Override + public void connectionUnregistered(AMQConnectionModel connection) + { + listener.connectionUnregistered(connection); + } + }); + } + + @Override + public Exchange getExchange(String name) + { + return _exchangeRegistry.getExchange(name); + } + + @Override + public Exchange getDefaultExchange() + { + return _exchangeRegistry.getDefaultExchange(); + } + + @Override + public Collection<Exchange> getExchanges() + { + return Collections.unmodifiableCollection(_exchangeRegistry.getExchanges()); + } + + @Override + public Collection<ExchangeType<? extends Exchange>> getExchangeTypes() + { + return _exchangeFactory.getRegisteredTypes(); + } + + @Override + public Exchange createExchange(UUID id, + String name, + String type, + boolean durable, + boolean autoDelete, + String alternateExchangeName) + throws AMQException + { + + if(_exchangeRegistry.isReservedExchangeName(name)) + { + throw new ReservedExchangeNameException(name); + } + synchronized (_exchangeRegistry) + { + Exchange existing; + if((existing = _exchangeRegistry.getExchange(name)) !=null) + { + throw new ExchangeExistsException(name,existing); + } + Exchange alternateExchange; + + if(alternateExchangeName != null) + { + alternateExchange = _exchangeRegistry.getExchange(alternateExchangeName); + if(alternateExchange == null) + { + throw new UnknownExchangeException(alternateExchangeName); + } + } + else + { + alternateExchange = null; + } + + if(id == null) + { + id = UUIDGenerator.generateExchangeUUID(name, getName()); + } + + Exchange exchange = _exchangeFactory.createExchange(id, name, type, durable, autoDelete); + exchange.setAlternateExchange(alternateExchange); + _exchangeRegistry.registerExchange(exchange); + if(durable) + { + DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), exchange); + } + return exchange; + } + } + + @Override + public void removeExchange(Exchange exchange, boolean force) throws AMQException + { + if(exchange.hasReferrers()) + { + throw new ExchangeIsAlternateException(exchange.getName()); + } + + for(ExchangeType type : getExchangeTypes()) + { + if(type.getDefaultExchangeName().toString().equals( exchange.getName() )) + { + throw new RequiredExchangeException(exchange.getName()); + } + } + _exchangeRegistry.unregisterExchange(exchange.getName(), !force); + if (exchange.isDurable() && !exchange.isAutoDelete()) + { + DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), exchange); + } + + } + public SecurityManager getSecurityManager() { return _securityManager; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java new file mode 100644 index 0000000000..f055760efe --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java @@ -0,0 +1,39 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.exchange.Exchange; + +public class ExchangeExistsException extends AMQException +{ + private final Exchange _existing; + + public ExchangeExistsException(String name, Exchange existing) + { + super(name); + _existing = existing; + } + + public Exchange getExistingExchange() + { + return _existing; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java new file mode 100644 index 0000000000..4be64a3b94 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java @@ -0,0 +1,30 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.AMQException; + +public class ExchangeIsAlternateException extends AMQException +{ + public ExchangeIsAlternateException(String name) + { + super(name); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java new file mode 100644 index 0000000000..da4c9825b1 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java @@ -0,0 +1,30 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.AMQException; + +public class RequiredExchangeException extends AMQException +{ + public RequiredExchangeException(String name) + { + super(name); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java new file mode 100644 index 0000000000..585f045ad9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java @@ -0,0 +1,38 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.AMQException; + +public class ReservedExchangeNameException extends AMQException +{ + private final String _name; + + public ReservedExchangeNameException(String name) + { + super("Attempt to create an exchange using a reserved name or prefix: " + name); + _name = name; + } + + public String getName() + { + return _name; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index 82be0c01e1..b34444cb4c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -96,7 +96,7 @@ public class StandardVirtualHost extends AbstractVirtualHost _durableConfigurationStore = initialiseConfigurationStore(virtualHost); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory()); _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java new file mode 100644 index 0000000000..5704126f62 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java @@ -0,0 +1,38 @@ +package org.apache.qpid.server.virtualhost;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.AMQException; + +public class UnknownExchangeException extends AMQException +{ + private final String _exchangeName; + + public UnknownExchangeException(String exchangeName) + { + super(exchangeName); + _exchangeName = exchangeName; + } + + public String getExchangeName() + { + return _exchangeName; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 8919f4d348..2435854912 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,13 +20,18 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.ScheduledFuture; +import org.apache.qpid.AMQException; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.v1_0.LinkRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; @@ -45,9 +50,23 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable QueueRegistry getQueueRegistry(); - ExchangeRegistry getExchangeRegistry(); + Exchange createExchange(UUID id, + String exchange, + String type, + boolean durable, + boolean autoDelete, + String alternateExchange) + throws AMQException; - ExchangeFactory getExchangeFactory(); + void removeExchange(Exchange exchange, boolean force) throws AMQException; + + Exchange getExchange(String name); + + Exchange getDefaultExchange(); + + Collection<Exchange> getExchanges(); + + Collection<ExchangeType<? extends Exchange>> getExchangeTypes(); DurableConfigurationStore getDurableConfigurationStore(); @@ -55,6 +74,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable SecurityManager getSecurityManager(); + void addVirtualHostListener(VirtualHostListener listener); + void close(); UUID getId(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 3c7e1395d1..34c42cc9cc 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -31,9 +31,10 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; @@ -82,12 +83,19 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private final Map<String, Map<UUID, Map<String, Object>>> _configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>(); + private final ExchangeRegistry _exchangeRegistry; + private final ExchangeFactory _exchangeFactory; + private MessageStoreLogSubject _logSubject; private MessageStore _store; - public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost) + public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, + ExchangeRegistry exchangeRegistry, + ExchangeFactory exchangeFactory) { _virtualHost = virtualHost; + _exchangeRegistry = exchangeRegistry; + _exchangeFactory = exchangeFactory; } @Override @@ -120,7 +128,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if (alternateExchangeId != null) { - Exchange altExchange = _virtualHost.getExchangeRegistry().getExchange(alternateExchangeId); + Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId); if (altExchange == null) { _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id); @@ -146,12 +154,11 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa try { Exchange exchange; - AMQShortString exchangeNameSS = new AMQShortString(exchangeName); - exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS); + exchange = _exchangeRegistry.getExchange(exchangeName); if (exchange == null) { - exchange = _virtualHost.getExchangeFactory().createExchange(id, exchangeNameSS, new AMQShortString(type), true, autoDelete, 0); - _virtualHost.getExchangeRegistry().registerExchange(exchange); + exchange = _exchangeFactory.createExchange(id, exchangeName, type, true, autoDelete); + _exchangeRegistry.registerExchange(exchange); } } catch (AMQException e) @@ -352,7 +359,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { try { - Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeId); + Exchange exchange = _exchangeRegistry.getExchange(exchangeId); if (exchange == null) { _logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java new file mode 100644 index 0000000000..8527435eea --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.queue.AMQQueue; + +public interface VirtualHostListener +{ + + public void queueRegistered(AMQQueue queue); + + public void queueUnregistered(AMQQueue queue); + + public void connectionRegistered(AMQConnectionModel connection); + + public void connectionUnregistered(AMQConnectionModel connection); + + public void exchangeRegistered(Exchange exchange); + + public void exchangeUnregistered(Exchange exchange); +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java index 341ab1b372..56f118cf7d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java @@ -179,8 +179,8 @@ public class DefaultExchangeFactoryTest extends QpidTestCase } @Override - public Exchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, - boolean autoDelete) throws AMQException + public Exchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, + boolean autoDelete) throws AMQException { return null; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index 7b7e2ec346..f608bc8cb0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -64,7 +64,7 @@ public class FanoutExchangeTest extends TestCase when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); - _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, false); } public void testIsBoundAMQShortStringFieldTableAMQQueueWhenQueueIsNull() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 2b965358e0..d76c7d1128 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -68,7 +68,7 @@ public class HeadersExchangeTest extends TestCase when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); - _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, false); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java index a33c85dfd1..e63744af9a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java @@ -45,7 +45,7 @@ public class BindingLogSubjectTest extends AbstractTestLogSubject _testVhost = BrokerTestHelper.createVirtualHost("test"); _routingKey = new AMQShortString("RoutingKey"); - _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct"); + _exchange = _testVhost.getExchange("amq.direct"); _queue = new MockAMQQueue("BindingLogSubjectTest"); ((MockAMQQueue) _queue).setVirtualHost(_testVhost); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java index 775a306bd3..b327738797 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java @@ -40,7 +40,7 @@ public class ExchangeLogSubjectTest extends AbstractTestLogSubject _testVhost = BrokerTestHelper.createVirtualHost("test"); - _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct"); + _exchange = _testVhost.getExchange("amq.direct"); _subject = new ExchangeLogSubject(_exchange,_testVhost); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 8d5e6b8ee3..c8e0e53d75 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -142,25 +142,24 @@ public class AMQQueueFactoryTest extends QpidTestCase fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); String queueName = "testDeadLetterQueueEnabled"; - AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); - AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; + String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; QueueRegistry qReg = _virtualHost.getQueueRegistry(); - ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); - assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName)); + assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, _virtualHost, FieldTable.convertToMap(fieldTable)); Exchange altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); - assertEquals("Alternate exchange name was not as expected", dlExchangeName.asString(), altExchange.getName()); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName()); - assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName)); - assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName)); + assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); AMQQueue dlQueue = qReg.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); @@ -180,14 +179,13 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception { String queueName = "testDeadLetterQueueEnabled"; - AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); - AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; + String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; QueueRegistry qReg = _virtualHost.getQueueRegistry(); - ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); - assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName)); + assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, _virtualHost, null); @@ -195,11 +193,11 @@ public class AMQQueueFactoryTest extends QpidTestCase assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount()); Exchange altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); - assertEquals("Alternate exchange name was not as expected", dlExchangeName.toString(), altExchange.getName()); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName()); - assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName)); - assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName)); + assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); AMQQueue dlQueue = qReg.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); @@ -222,20 +220,19 @@ public class AMQQueueFactoryTest extends QpidTestCase fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false); String queueName = "testDeadLetterQueueDisabled"; - AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); - AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; + String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; QueueRegistry qReg = _virtualHost.getQueueRegistry(); - ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); - assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName)); + assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, _virtualHost, FieldTable.convertToMap(fieldTable)); assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); - assertNull("The alternate exchange should still not exist", exReg.getExchange(dlExchangeName)); + assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName)); assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName)); @@ -255,14 +252,13 @@ public class AMQQueueFactoryTest extends QpidTestCase fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; - AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); - AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; + String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; QueueRegistry qReg = _virtualHost.getQueueRegistry(); - ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); - assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName)); + assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); //create an autodelete queue AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false, @@ -271,7 +267,7 @@ public class AMQQueueFactoryTest extends QpidTestCase //ensure that the autodelete property overrides the request to enable DLQ assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); - assertNull("The alternate exchange should not exist as queue is autodelete", exReg.getExchange(dlExchangeName)); + assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName)); assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName)); //only 1 queue should have been registered diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 505c47a69b..c37e6da729 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -112,7 +112,7 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments)); - _exchange = (DirectExchange) _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); + _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString()); } @Override @@ -423,7 +423,7 @@ public class SimpleAMQQueueTest extends QpidTestCase assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size()); /* Now release the first message only, causing it to be requeued */ - queueEntries.get(0).release(); + queueEntries.get(0).release(); Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index af4bbd1731..8b678c4eb4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.store; +import java.util.ArrayList; +import java.util.Collection; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; @@ -78,9 +80,9 @@ public class MessageStoreTest extends QpidTestCase public static final String SELECTOR_VALUE = "Test = 'MST'"; public static final String LVQ_KEY = "MST-LVQ-KEY"; - private AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange"); - private AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange"); - private AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange"); + private String nonDurableExchangeName = "MST-NonDurableDirectExchange"; + private String directExchangeName = "MST-DirectExchange"; + private String topicExchangeName = "MST-TopicExchange"; private AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable"); private AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable"); @@ -365,12 +367,12 @@ public class MessageStoreTest extends QpidTestCase */ public void testExchangePersistence() throws Exception { - int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size(); + int origExchangeCount = getVirtualHost().getExchanges().size(); - Map<AMQShortString, Exchange> oldExchanges = createExchanges(); + Map<String, Exchange> oldExchanges = createExchanges(); assertEquals("Incorrect number of exchanges registered before recovery", - origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size()); + origExchangeCount + 3, getVirtualHost().getExchanges().size()); reloadVirtualHost(); @@ -385,31 +387,28 @@ public class MessageStoreTest extends QpidTestCase */ public void testDurableExchangeRemoval() throws Exception { - int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size(); + int origExchangeCount = getVirtualHost().getExchanges().size(); createExchange(DirectExchange.TYPE, directExchangeName, true); - ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); assertEquals("Incorrect number of exchanges registered before recovery", - origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); + origExchangeCount + 1, getVirtualHost().getExchanges().size()); reloadVirtualHost(); - exchangeRegistry = getVirtualHost().getExchangeRegistry(); assertEquals("Incorrect number of exchanges registered after first recovery", - origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); + origExchangeCount + 1, getVirtualHost().getExchanges().size()); //test that removing the exchange means it is not recovered next time - final Exchange exchange = exchangeRegistry.getExchange(directExchangeName); + final Exchange exchange = getVirtualHost().getExchange(directExchangeName); DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange); reloadVirtualHost(); - exchangeRegistry = getVirtualHost().getExchangeRegistry(); assertEquals("Incorrect number of exchanges registered after second recovery", - origExchangeCount, exchangeRegistry.getExchangeNames().size()); + origExchangeCount, getVirtualHost().getExchanges().size()); assertNull("Durable exchange was not removed:" + directExchangeName, - exchangeRegistry.getExchange(directExchangeName)); + getVirtualHost().getExchange(directExchangeName)); } /** @@ -420,12 +419,12 @@ public class MessageStoreTest extends QpidTestCase */ public void testBindingPersistence() throws Exception { - int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size(); + int origExchangeCount = getVirtualHost().getExchanges().size(); createAllQueues(); createAllTopicQueues(); - Map<AMQShortString, Exchange> exchanges = createExchanges(); + Map<String, Exchange> exchanges = createExchanges(); Exchange nonDurableExchange = exchanges.get(nonDurableExchangeName); Exchange directExchange = exchanges.get(directExchangeName); @@ -436,7 +435,7 @@ public class MessageStoreTest extends QpidTestCase bindAllTopicQueuesToExchange(topicExchange, topicRouting); assertEquals("Incorrect number of exchanges registered before recovery", - origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size()); + origExchangeCount + 3, getVirtualHost().getExchanges().size()); reloadVirtualHost(); @@ -469,8 +468,7 @@ public class MessageStoreTest extends QpidTestCase assertEquals("Incorrect number of bindings registered after first recovery", 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); - ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); - exch = exchangeRegistry.getExchange(directExchangeName); + exch = getVirtualHost().getExchange(directExchangeName); assertNotNull("Exchange was not recovered", exch); //remove the binding and verify result after recovery @@ -488,26 +486,30 @@ public class MessageStoreTest extends QpidTestCase * and that the new exchanges are not the same objects as the provided list (i.e. that the * reload actually generated new exchange objects) */ - private void validateExchanges(int originalNumExchanges, Map<AMQShortString, Exchange> oldExchanges) + private void validateExchanges(int originalNumExchanges, Map<String, Exchange> oldExchanges) { - ExchangeRegistry registry = getVirtualHost().getExchangeRegistry(); - + Collection<Exchange> exchanges = getVirtualHost().getExchanges(); + Collection<String> exchangeNames = new ArrayList(exchanges.size()); + for(Exchange exchange : exchanges) + { + exchangeNames.add(exchange.getName()); + } assertTrue(directExchangeName + " exchange NOT reloaded", - registry.getExchangeNames().contains(directExchangeName)); + exchangeNames.contains(directExchangeName)); assertTrue(topicExchangeName + " exchange NOT reloaded", - registry.getExchangeNames().contains(topicExchangeName)); + exchangeNames.contains(topicExchangeName)); assertTrue(nonDurableExchangeName + " exchange reloaded", - !registry.getExchangeNames().contains(nonDurableExchangeName)); + !exchangeNames.contains(nonDurableExchangeName)); //check the old exchange objects are not the same as the new exchanges assertTrue(directExchangeName + " exchange NOT reloaded", - registry.getExchange(directExchangeName) != oldExchanges.get(directExchangeName)); + getVirtualHost().getExchange(directExchangeName) != oldExchanges.get(directExchangeName)); assertTrue(topicExchangeName + " exchange NOT reloaded", - registry.getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName)); + getVirtualHost().getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName)); // There should only be the original exchanges + our 2 recovered durable exchanges assertEquals("Incorrect number of exchanges available", - originalNumExchanges + 2, registry.getExchangeNames().size()); + originalNumExchanges + 2, getVirtualHost().getExchanges().size()); } /** Validates the Durable queues and their properties are as expected following recovery */ @@ -771,9 +773,9 @@ public class MessageStoreTest extends QpidTestCase } - private Map<AMQShortString, Exchange> createExchanges() + private Map<String, Exchange> createExchanges() { - Map<AMQShortString, Exchange> exchanges = new HashMap<AMQShortString, Exchange>(); + Map<String, Exchange> exchanges = new HashMap<String, Exchange>(); //Register non-durable DirectExchange exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false)); @@ -785,32 +787,19 @@ public class MessageStoreTest extends QpidTestCase return exchanges; } - private Exchange createExchange(ExchangeType<?> type, AMQShortString name, boolean durable) + private Exchange createExchange(ExchangeType<?> type, String name, boolean durable) { Exchange exchange = null; try { - exchange = type.newInstance(UUIDGenerator.generateRandomUUID(), getVirtualHost(), name, durable, 0, false); + exchange = getVirtualHost().createExchange(null, name, type.getName().toString(), durable, false, null); } catch (AMQException e) { fail(e.getMessage()); } - try - { - getVirtualHost().getExchangeRegistry().registerExchange(exchange); - if (durable) - { - DurableConfigurationStoreHelper.createExchange(getVirtualHost().getDurableConfigurationStore(), - exchange); - } - } - catch (AMQException e) - { - fail(e.getMessage()); - } return exchange; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 7811d04997..24cdf34360 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -190,7 +190,7 @@ public class BrokerTestHelper when(info.getExchange()).thenReturn(exchangeNameAsShortString); when(info.getRoutingKey()).thenReturn(rouningKey); - Exchange exchange = channel.getVirtualHost().getExchangeRegistry().getExchange(exchangeName); + Exchange exchange = channel.getVirtualHost().getExchange(exchangeName); for (int count = 0; count < numberOfMessages; count++) { channel.setPublishFrame(info, exchange); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 7552a653fe..78d55c42bf 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -20,12 +20,16 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Collection; import java.util.concurrent.ScheduledFuture; +import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.AbstractExchange; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.v1_0.LinkRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; @@ -77,16 +81,6 @@ public class MockVirtualHost implements VirtualHost return null; } - public ExchangeFactory getExchangeFactory() - { - return null; - } - - public ExchangeRegistry getExchangeRegistry() - { - return null; - } - public int getHouseKeepingActiveCount() { return 0; @@ -127,11 +121,56 @@ public class MockVirtualHost implements VirtualHost return null; } + @Override + public Exchange createExchange(UUID id, + String exchange, + String type, + boolean durable, + boolean autoDelete, + String alternateExchange) throws AMQException + { + return null; + } + + @Override + public void removeExchange(Exchange exchange, boolean force) throws AMQException + { + } + + @Override + public Exchange getExchange(String name) + { + return null; + } + + @Override + public Exchange getDefaultExchange() + { + return null; + } + + @Override + public Collection<Exchange> getExchanges() + { + return null; + } + + @Override + public Collection<ExchangeType<? extends Exchange>> getExchangeTypes() + { + return null; + } + public SecurityManager getSecurityManager() { return null; } + @Override + public void addVirtualHostListener(VirtualHostListener listener) + { + } + public LinkRegistry getLinkRegistry(String remoteContainerId) { return null; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index ae09e8d7e7..6b8ea0e80b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -200,7 +200,7 @@ public class StandardVirtualHostTest extends QpidTestCase File config = writeConfigFile(vhostName, queueName, exchangeName, false, new String[]{"ping","pong"}, bindingArguments); VirtualHost vhost = createVirtualHost(vhostName, config); - Exchange exch = vhost.getExchangeRegistry().getExchange(getName() +".direct"); + Exchange exch = vhost.getExchange(getName() +".direct"); Collection<Binding> bindings = exch.getBindings(); assertNotNull("Bindings cannot be null", bindings); assertEquals("Unexpected number of bindings", 3, bindings.size()); @@ -245,10 +245,10 @@ public class StandardVirtualHostTest extends QpidTestCase AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName); assertNotNull("queue should exist", queue); - Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange(); + Exchange defaultExch = vhost.getDefaultExchange(); assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue)); - Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName); + Exchange exch = vhost.getExchange(exchangeName); assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue)); for(String key: routingKeys) |