summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-05 15:30:53 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-05 15:30:53 +0000
commit77171f498c5c7dca09448ce8168c3bd30bfe3825 (patch)
treec03d565b4cd8e9ff847f2599d5abfa84c6886ccf
parent602ae3b5f7cc7dbe429532466237e2e943ae2059 (diff)
downloadqpid-python-77171f498c5c7dca09448ce8168c3bd30bfe3825.tar.gz
QPID-4973 : [Java Broker] Refactor DurableConfigurationStore interface to be in terms of ConfiguredObject rather than implementation classes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1500047 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java53
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java97
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java104
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java216
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java185
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java39
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java38
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java38
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java25
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java41
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java44
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java81
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java59
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java6
49 files changed, 865 insertions, 512 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index 8cf1ad8a83..be91e4a484 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -70,7 +70,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
_messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT);
_messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
_messageStore.configureConfigStore(getName(),
recoveryHandler,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 3f6489cb86..53dd6df599 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -63,7 +63,6 @@ public abstract class AbstractExchange implements Exchange
private Exchange _alternateExchange;
private boolean _durable;
- private int _ticket;
private VirtualHost _virtualHost;
@@ -109,14 +108,17 @@ public abstract class AbstractExchange implements Exchange
return _type.getName();
}
- public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ public void initialise(UUID id,
+ VirtualHost host,
+ AMQShortString name,
+ boolean durable,
+ boolean autoDelete)
throws AMQException
{
_virtualHost = host;
_name = name;
_durable = durable;
_autoDelete = autoDelete;
- _ticket = ticket;
_id = id;
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
@@ -135,11 +137,6 @@ public abstract class AbstractExchange implements Exchange
return _autoDelete;
}
- public int getTicket()
- {
- return _ticket;
- }
-
public void close() throws AMQException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index dad6e60bfe..2873eb31e8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -49,7 +49,6 @@ public class DefaultExchange implements Exchange
private UUID _id;
private VirtualHost _virtualHost;
- private int _ticket;
private static final Logger _logger = Logger.getLogger(DefaultExchange.class);
private final AtomicBoolean _closed = new AtomicBoolean();
@@ -62,12 +61,10 @@ public class DefaultExchange implements Exchange
VirtualHost host,
AMQShortString name,
boolean durable,
- int ticket,
boolean autoDelete) throws AMQException
{
_id = id;
_virtualHost = host;
- _ticket = ticket;
}
@Override
@@ -197,12 +194,6 @@ public class DefaultExchange implements Exchange
}
@Override
- public int getTicket()
- {
- return _ticket;
- }
-
- @Override
public void close() throws AMQException
{
if(_closed.compareAndSet(false,true))
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 5e6e36d330..a0b80a601c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -102,51 +102,45 @@ public class DefaultExchangeFactory implements ExchangeFactory
public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes()
{
- Collection<ExchangeType<? extends Exchange>> publicTypes =
+ Collection<ExchangeType<? extends Exchange>> publicTypes =
new ArrayList<ExchangeType<? extends Exchange>>();
publicTypes.addAll(_exchangeClassMap.values());
-
+
return publicTypes;
}
public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
- throws AMQException
+ throws AMQException
{
- return createExchange(new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0);
- }
- public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete)
- throws AMQException
- {
- return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0);
+ UUID id = UUIDGenerator.generateExchangeUUID(exchange, _host.getName());
+ return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete);
}
- public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable,
- boolean autoDelete, int ticket)
+ public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete)
throws AMQException
{
- UUID id = UUIDGenerator.generateExchangeUUID(exchange.asString(), _host.getName());
- return createExchange(id, exchange, type, durable, autoDelete, ticket);
+ return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete);
}
- public Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable,
- boolean autoDelete, int ticket)
+ private Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable,
+ boolean autoDelete)
throws AMQException
{
// Check access
if (!_host.getSecurityManager().authoriseCreateExchange(autoDelete, durable, exchange, null, null, null, type))
{
- String description = "Permission denied: exchange-name '" + exchange.asString() + "'";
+ String description = "Permission denied: exchange-name '" + exchange + "'";
throw new AMQSecurityException(description);
}
-
+
ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type);
if (exchType == null)
{
throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
}
-
- Exchange e = exchType.newInstance(id, _host, exchange, durable, ticket, autoDelete);
+
+ Exchange e = exchType.newInstance(id, _host, exchange, durable, autoDelete);
return e;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 68c15779a0..75c489c731 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -24,12 +24,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -46,8 +44,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
/**
* Maps from exchange name to exchange instance
*/
- private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
- private ConcurrentMap<String, Exchange> _exchangeMapStr = new ConcurrentHashMap<String, Exchange>();
+ private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
private Exchange _defaultExchange;
private VirtualHost _host;
@@ -59,17 +56,17 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
_host = host;
}
- public void initialise() throws AMQException
+ public void initialise(ExchangeFactory exchangeFactory) throws AMQException
{
//create 'standard' exchanges:
- new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore());
+ new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore());
_defaultExchange = new DefaultExchange();
UUID defaultExchangeId =
UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName());
- _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false,0,false);
+ _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false, false);
}
@@ -80,8 +77,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void registerExchange(Exchange exchange) throws AMQException
{
- _exchangeMap.put(exchange.getNameShortString(), exchange);
- _exchangeMapStr.put(exchange.getNameShortString().toString(), exchange);
+ _exchangeMap.put(exchange.getNameShortString().toString(), exchange);
synchronized (_listeners)
{
for(RegistryChangeListener listener : _listeners)
@@ -102,12 +98,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
return _defaultExchange;
}
- public Collection<AMQShortString> getExchangeNames()
- {
- return _exchangeMap.keySet();
- }
-
- public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
+ public void unregisterExchange(String name, boolean inUse) throws AMQException
{
final Exchange exchange = _exchangeMap.get(name);
if (exchange == null)
@@ -123,13 +114,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
// TODO: check inUse argument
Exchange e = _exchangeMap.remove(name);
- _exchangeMapStr.remove(name.toString());
if (e != null)
{
- if (e.isDurable())
- {
- DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), e);
- }
e.close();
synchronized (_listeners)
@@ -147,11 +133,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
}
- public void unregisterExchange(String name, boolean inUse) throws AMQException
- {
- unregisterExchange(new AMQShortString(name), inUse);
- }
-
public Collection<Exchange> getExchanges()
{
return new ArrayList<Exchange>(_exchangeMap.values());
@@ -162,19 +143,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
_listeners.add(listener);
}
- public Exchange getExchange(AMQShortString name)
- {
- if ((name == null) || name.length() == 0)
- {
- return getDefaultExchange();
- }
- else
- {
- return _exchangeMap.get(name);
- }
-
- }
-
public Exchange getExchange(String name)
{
if ((name == null) || name.length() == 0)
@@ -183,17 +151,15 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
else
{
- return _exchangeMapStr.get(name);
+ return _exchangeMap.get(name);
}
}
@Override
public void clearAndUnregisterMbeans()
{
- for (final AMQShortString exchangeName : getExchangeNames())
+ for (final Exchange exchange : getExchanges())
{
- final Exchange exchange = getExchange(exchangeName);
-
//TODO: this is a bit of a hack, what if the listeners aren't aware
//that we are just unregistering the MBean because of HA, and aren't
//actually removing the exchange as such.
@@ -206,7 +172,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
}
_exchangeMap.clear();
- _exchangeMapStr.clear();
}
@Override
@@ -237,7 +202,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
{
return true;
}
- Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeFactory().getRegisteredTypes();
+ Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeTypes();
for (ExchangeType<? extends Exchange> type : registeredTypes)
{
if (type.getDefaultExchangeName().toString().equals(name))
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
index 096d5265ed..c193764edc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
@@ -36,13 +36,12 @@ public class DirectExchangeType implements ExchangeType<DirectExchange>
}
public DirectExchange newInstance(UUID id, VirtualHost host,
- AMQShortString name,
- boolean durable,
- int ticket,
- boolean autoDelete) throws AMQException
+ AMQShortString name,
+ boolean durable,
+ boolean autoDelete) throws AMQException
{
DirectExchange exch = new DirectExchange();
- exch.initialise(id, host,name,durable,ticket,autoDelete);
+ exch.initialise(id, host,name,durable, autoDelete);
return exch;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index b632c68ace..735072cc82 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -39,7 +39,7 @@ import java.util.UUID;
public interface Exchange extends ExchangeReferrer
{
- void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete)
throws AMQException;
@@ -60,8 +60,6 @@ public interface Exchange extends ExchangeReferrer
*/
boolean isAutoDelete();
- int getTicket();
-
Exchange getAlternateExchange();
void setAlternateExchange(Exchange exchange);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
index e602d476d9..3ccfa51499 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,19 +30,13 @@ import java.util.UUID;
public interface ExchangeFactory
{
- Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
- int ticket)
- throws AMQException;
Collection<ExchangeType<? extends Exchange>> getRegisteredTypes();
-
+
Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes();
Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
- Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable,
- boolean autoDelete, int ticket)
- throws AMQException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
index 10a733546c..fd7c6a7fe0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
@@ -33,17 +33,17 @@ public class ExchangeInitialiser
{
for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
{
- define (registry, factory, type.getDefaultExchangeName(), type.getName(), store);
+ define (registry, factory, type.getDefaultExchangeName().toString(), type.getName().toString(), store);
}
}
private void define(ExchangeRegistry r, ExchangeFactory f,
- AMQShortString name, AMQShortString type, DurableConfigurationStore store) throws AMQException
+ String name, String type, DurableConfigurationStore store) throws AMQException
{
if(r.getExchange(name)== null)
{
- Exchange exchange = f.createExchange(name, type, true, false, 0);
+ Exchange exchange = f.createExchange(name, type, true, false);
r.registerExchange(exchange);
if(exchange.isDurable())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index 4dcedb4797..53ee7de661 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -31,27 +31,19 @@ public interface ExchangeRegistry
{
void registerExchange(Exchange exchange) throws AMQException;
- /**
- * Unregister an exchange
- * @param name name of the exchange to delete
- * @param inUse if true, do NOT delete the exchange if it is in use (has queues bound to it)
- * @throws ExchangeInUseException when the exchange cannot be deleted because it is in use
- * @throws AMQException
- */
- void unregisterExchange(AMQShortString name, boolean inUse) throws ExchangeInUseException, AMQException;
-
- Exchange getExchange(AMQShortString name);
-
- void setDefaultExchange(Exchange exchange);
-
Exchange getDefaultExchange();
- Collection<AMQShortString> getExchangeNames();
-
- void initialise() throws AMQException;
+ void initialise(ExchangeFactory exchangeFactory) throws AMQException;
Exchange getExchange(String exchangeName);
+ /**
+ * Unregister an exchange
+ * @param exchange name of the exchange to delete
+ * @param ifUnused if true, do NOT delete the exchange if it is in use (has queues bound to it)
+ * @throws ExchangeInUseException when the exchange cannot be deleted because it is in use
+ * @throws AMQException
+ */
void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException;
void clearAndUnregisterMbeans();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
index 0371a363de..587761b64e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
@@ -36,11 +36,11 @@ public class FanoutExchangeType implements ExchangeType<FanoutExchange>
}
public FanoutExchange newInstance(UUID id, VirtualHost host, AMQShortString name,
- boolean durable, int ticket, boolean autoDelete)
+ boolean durable, boolean autoDelete)
throws AMQException
{
FanoutExchange exch = new FanoutExchange();
- exch.initialise(id, host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, autoDelete);
return exch;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
index ed4d57d0f8..1c99fbb364 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
@@ -35,12 +35,12 @@ public class HeadersExchangeType implements ExchangeType<HeadersExchange>
return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
}
- public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket,
- boolean autoDelete) throws AMQException
+ public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable,
+ boolean autoDelete) throws AMQException
{
HeadersExchange exch = new HeadersExchange();
- exch.initialise(id, host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, autoDelete);
return exch;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
index 25a3549e61..d921901f0f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
@@ -36,13 +36,12 @@ public class TopicExchangeType implements ExchangeType<TopicExchange>
}
public TopicExchange newInstance(UUID id, VirtualHost host,
- AMQShortString name,
- boolean durable,
- int ticket,
- boolean autoDelete) throws AMQException
+ AMQShortString name,
+ boolean durable,
+ boolean autoDelete) throws AMQException
{
TopicExchange exch = new TopicExchange();
- exch.initialise(id, host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, autoDelete);
return exch;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index cb8918e847..85fee94143 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -67,7 +67,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
}
VirtualHost vHost = session.getVirtualHost();
- Exchange exch = vHost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exch = vHost.getExchange(exchangeName.toString());
// if the exchange does not exist we raise a channel exception
if (exch == null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 2e6a98d81b..8493e97215 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -88,7 +88,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
throw new AMQException("Exchange exchange must not be null");
}
- Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exchange = virtualHost.getExchange(exchangeName.toString());
ExchangeBoundOkBody response;
if (exchange == null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index b3967689dc..437869ab01 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -32,12 +32,11 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
@@ -59,8 +58,6 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
final AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
@@ -73,57 +70,63 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
_logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName);
}
- synchronized(exchangeRegistry)
+ Exchange exchange;
+
+ if (body.getPassive())
{
- Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+ exchange = virtualHost.getExchange(exchangeName == null ? null : exchangeName.toString());
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ }
+ else if (!exchange.getTypeShortString().equals(body.getType()) && !(body.getType() == null || body.getType().length() ==0))
+ {
+
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
+ exchangeName + " of type " + exchange.getTypeShortString()
+ + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+ }
- if (exchange == null)
+ }
+ else
+ {
+ try
{
- if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
- }
- else if(exchangeName.startsWith("amq."))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix 'amq.'.");
- }
- else if(exchangeName.startsWith("qpid."))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix 'qpid.'.");
- }
- else
+ exchange = virtualHost.createExchange(null,
+ exchangeName == null ? null : exchangeName.intern().toString(),
+ body.getType() == null ? null : body.getType().intern().toString(),
+ body.getDurable(),
+ body.getAutoDelete(),
+ null);
+
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Attempt to declare exchange: " + exchangeName +
+ " which begins with reserved prefix.");
+
+ }
+ catch(ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if(!exchange.getTypeShortString().equals(body.getType()))
{
- try
- {
- exchange = exchangeFactory.createExchange(exchangeName == null ? null : exchangeName.intern(),
- body.getType() == null ? null : body.getType().intern(),
- body.getDurable(),
- body.getAutoDelete(), body.getTicket());
- exchangeRegistry.registerExchange(exchange);
-
- if (exchange.isDurable())
- {
- DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(),
- exchange);
- }
- }
- catch(AMQUnknownExchangeType e)
- {
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
- }
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getTypeShortString()
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
}
}
- else if (!exchange.getTypeShortString().equals(body.getType()) && !((body.getType() == null || body.getType().length() ==0) && body.getPassive()))
+ catch(AMQUnknownExchangeType e)
{
-
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
- exchangeName + " of type " + exchange.getTypeShortString() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
}
}
+
+
if(!body.getNowait())
{
MethodRegistry methodRegistry = session.getMethodRegistry();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index 339085691f..25d0182202 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,16 +21,21 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.ExecutionErrorCode;
public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
{
@@ -49,7 +54,6 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
final AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
@@ -58,14 +62,18 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
channel.sync();
try
{
- if(exchangeRegistry.getExchange(body.getExchange()) == null)
+ final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
+
+ final Exchange exchange = virtualHost.getExchange(exchangeName);
+ if(exchange == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange());
}
- exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
+
+ virtualHost.removeExchange(exchange, !body.getIfUnused());
ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
-
+
session.writeFrame(responseBody.generateFrame(channelId));
}
catch (ExchangeInUseException e)
@@ -73,5 +81,15 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use");
// TODO: sort out consistent channel close mechanism that does all clean up etc.
}
+
+ catch (ExchangeIsAlternateException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+
+ }
+ catch (RequiredExchangeException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted");
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 0501443efa..63b8bf3136 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -32,7 +33,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -62,7 +62,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
AMQChannel channel = protocolConnection.getChannel(channelId);
@@ -103,10 +102,11 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
- final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
+ final Exchange exch = virtualHost.getExchange(exchangeName);
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.");
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 521f27885f..035bf25e04 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -62,7 +62,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
final AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
final AMQSessionModel session = protocolConnection.getChannel(channelId);
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
index 523c7acd88..e0e814537a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,16 +7,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
package org.apache.qpid.server.handler;
@@ -60,7 +60,6 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -97,7 +96,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
- final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ final Exchange exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
if (exch == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
@@ -112,7 +111,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
exch.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments()));
}
-
+
if (_log.isInfoEnabled())
{
_log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
index 5d3d507fff..07083fc661 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
@@ -42,6 +42,8 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apache.qpid.server.exchange.Exchange.BindingListener
@@ -121,7 +123,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa
return createBinding(bindingKey, queue, bindingArgs, attributes);
}
-
+
public org.apache.qpid.server.model.Binding createBinding(String bindingKey, Queue queue,
Map<String, Object> bindingArguments,
Map<String, Object> attributes)
@@ -165,21 +167,11 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa
{
try
{
- ExchangeRegistry exchangeRegistry = _vhost.getVirtualHost().getExchangeRegistry();
- if (exchangeRegistry.isReservedExchangeName(getName()))
- {
- throw new UnsupportedOperationException("'" + getName() + "' is a reserved exchange and can't be deleted");
- }
-
- if(_exchange.hasReferrers())
- {
- throw new AMQException( AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", null);
- }
-
- synchronized(exchangeRegistry)
- {
- exchangeRegistry.unregisterExchange(getName(), false);
- }
+ _vhost.getVirtualHost().removeExchange(_exchange, true);
+ }
+ catch(RequiredExchangeException e)
+ {
+ throw new UnsupportedOperationException("'" + getName() + "' is a reserved exchange and can't be deleted");
}
catch(AMQException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 74b826da91..c09dd9449e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -81,11 +81,13 @@ import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.plugin.VirtualHostFactory;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
+import org.apache.qpid.server.virtualhost.VirtualHostListener;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener,
- QueueRegistry.RegistryChangeListener,
- IConnectionRegistry.RegistryChangeListener
+public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener
{
private static final Logger LOGGER = Logger.getLogger(VirtualHostAdapter.class);
@@ -184,7 +186,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
private void populateExchanges()
{
Collection<org.apache.qpid.server.exchange.Exchange> actualExchanges =
- _virtualHost.getExchangeRegistry().getExchanges();
+ _virtualHost.getExchanges();
synchronized (_exchangeAdapters)
{
@@ -296,31 +298,81 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
try
{
- ExchangeRegistry exchangeRegistry = _virtualHost.getExchangeRegistry();
- if (exchangeRegistry.isReservedExchangeName(name))
+ String alternateExchange = null;
+ if(attributes.containsKey(Exchange.ALTERNATE_EXCHANGE))
{
- throw new UnsupportedOperationException("'" + name + "' is a reserved exchange name");
- }
- synchronized(exchangeRegistry)
- {
- org.apache.qpid.server.exchange.Exchange exchange = exchangeRegistry.getExchange(name);
- if (exchange != null)
+ Object altExchangeObject = attributes.get(Exchange.ALTERNATE_EXCHANGE);
+ if(altExchangeObject instanceof Exchange)
{
- throw new IllegalArgumentException("Exchange with name '" + name + "' already exists");
+ alternateExchange = ((Exchange) altExchangeObject).getName();
}
- exchange = _virtualHost.getExchangeFactory().createExchange(name, type, durable,
- lifetime == LifetimePolicy.AUTO_DELETE);
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
- if(durable)
+ else if(altExchangeObject instanceof UUID)
{
- DurableConfigurationStoreHelper.createExchange(_virtualHost.getDurableConfigurationStore(),
- exchange);
+ for(Exchange ex : getExchanges())
+ {
+ if(altExchangeObject.equals(ex.getId()))
+ {
+ alternateExchange = ex.getName();
+ break;
+ }
+ }
}
- synchronized (_exchangeAdapters)
+ else if(altExchangeObject instanceof String)
{
- return _exchangeAdapters.get(exchange);
+
+ for(Exchange ex : getExchanges())
+ {
+ if(altExchangeObject.equals(ex.getName()))
+ {
+ alternateExchange = ex.getName();
+ break;
+ }
+ }
+ if(alternateExchange == null)
+ {
+ try
+ {
+ UUID id = UUID.fromString(altExchangeObject.toString());
+ for(Exchange ex : getExchanges())
+ {
+ if(id.equals(ex.getId()))
+ {
+ alternateExchange = ex.getName();
+ break;
+ }
+ }
+ }
+ catch(IllegalArgumentException e)
+ {
+ // ignore
+ }
+
+ }
}
}
+ org.apache.qpid.server.exchange.Exchange exchange = _virtualHost.createExchange(null,
+ name,
+ type,
+ durable,
+ lifetime == LifetimePolicy.AUTO_DELETE,
+ alternateExchange);
+ synchronized (_exchangeAdapters)
+ {
+ return _exchangeAdapters.get(exchange);
+ }
+
+ }
+ catch(ExchangeExistsException e)
+ {
+ throw new IllegalArgumentException("Exchange with name '" + name + "' already exists");
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ throw new UnsupportedOperationException("'" + name + "' is a reserved exchange name");
+ }
+ catch(UnknownExchangeException e)
+ {
+ throw new IllegalArgumentException("Alternate Exchange with name '" + e.getExchangeName() + "' does not exist");
}
catch(AMQException e)
{
@@ -726,7 +778,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
public Collection<String> getExchangeTypes()
{
Collection<ExchangeType<? extends org.apache.qpid.server.exchange.Exchange>> types =
- _virtualHost.getExchangeFactory().getRegisteredTypes();
+ _virtualHost.getExchangeTypes();
Collection<String> exchangeTypes = new ArrayList<String>();
@@ -884,7 +936,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
if(SUPPORTED_EXCHANGE_TYPES.equals(name))
{
List<String> types = new ArrayList<String>();
- for(@SuppressWarnings("rawtypes") ExchangeType type : _virtualHost.getExchangeFactory().getRegisteredTypes())
+ for(@SuppressWarnings("rawtypes") ExchangeType type : _virtualHost.getExchangeTypes())
{
types.add(type.getName().asString());
}
@@ -1009,7 +1061,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
else if(VirtualHost.EXCHANGE_COUNT.equals(name))
{
- return _vhost.getExchangeRegistry().getExchanges().size();
+ return _vhost.getExchanges().size();
}
else if(VirtualHost.CONNECTION_COUNT.equals(name))
{
@@ -1127,11 +1179,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
virtualHostRegistry.registerVirtualHost(_virtualHost);
_statistics = new VirtualHostStatisticsAdapter(_virtualHost);
- _virtualHost.getQueueRegistry().addRegistryChangeListener(this);
+ _virtualHost.addVirtualHostListener(this);
populateQueues();
- _virtualHost.getExchangeRegistry().addRegistryChangeListener(this);
populateExchanges();
- _virtualHost.getConnectionRegistry().addRegistryChangeListener(this);
synchronized(_aliases)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
index 72d14456ed..0c14c06624 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
@@ -102,7 +102,7 @@ public class HeaderPropertiesConverter
exchangeName = "";
}
- Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exchange = vhost.getExchange(exchangeName);
String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString();
props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'"));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
index 40ef6ad6a2..7bd0728850 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
@@ -31,6 +31,6 @@ public interface ExchangeType<T extends Exchange>
{
public AMQShortString getName();
public T newInstance(UUID id, VirtualHost host, AMQShortString name,
- boolean durable, int ticket, boolean autoDelete) throws AMQException;
+ boolean durable, boolean autoDelete) throws AMQException;
public AMQShortString getDefaultExchangeName();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index a72671762c..ca67b6f79b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -215,7 +215,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
List<Binding> bindingsToRemove = new ArrayList<Binding>();
for(Binding existingBinding : bindings)
{
- if(existingBinding.getExchange() != _vhost.getExchangeRegistry().getDefaultExchange()
+ if(existingBinding.getExchange() != _vhost.getDefaultExchange()
&& existingBinding.getExchange() != exchange)
{
bindingsToRemove.add(existingBinding);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 2b39fb1ff0..ce6ef0b183 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -115,7 +115,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
else
{
- Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
+ Exchange exchg = _vhost.getExchange(addr);
if(exchg != null)
{
destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
@@ -244,7 +244,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
String addr = target.getAddress();
- Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
+ Exchange exchg = _vhost.getExchange(addr);
if(exchg != null)
{
destination = new ExchangeDestination(exchg, target.getDurable(),
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index f9e678525f..1eeb6dccf3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
@@ -300,25 +301,22 @@ public class AMQQueueFactory
final String dlExchangeName = getDeadLetterExchangeName(queueName);
final String dlQueueName = getDeadLetterQueueName(queueName);
- final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
Exchange dlExchange = null;
- synchronized(exchangeRegistry)
- {
- dlExchange = exchangeRegistry.getExchange(dlExchangeName);
-
- if(dlExchange == null)
- {
- dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
-
- exchangeRegistry.registerExchange(dlExchange);
+ final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName());
- //enter the dle in the persistent store
- DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(),
- dlExchange);
- }
+ try
+ {
+ dlExchange = virtualHost.createExchange(dlExchangeId,
+ dlExchangeName,
+ ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(),
+ true, false, null);
+ }
+ catch(ExchangeExistsException e)
+ {
+ // We're ok if the exchange already exists
+ dlExchange = e.getExistingExchange();
}
AMQQueue dlQueue = null;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 889fe7c5c1..b7ac86795d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -63,6 +63,11 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
@@ -185,7 +190,7 @@ public class ServerSessionDelegate extends SessionDelegate
if(!method.hasQueue())
{
- exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
}
else
{
@@ -684,7 +689,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
String exchangeName = method.getExchange();
VirtualHost virtualHost = getVirtualHost(session);
- ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
//we must check for any unsupported arguments present and throw not-implemented
if(method.hasArguments())
@@ -697,114 +701,79 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
}
- synchronized(exchangeRegistry)
+
+ if(method.getPassive())
{
Exchange exchange = getExchange(session, exchangeName);
- if(method.getPassive())
+ if(exchange == null)
{
- if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
- }
- else
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
+ }
+ else
+ {
+ if (!exchange.getTypeShortString().toString().equals(method.getType())
+ && (method.getType() != null && method.getType().length() > 0))
{
- if (!exchange.getTypeShortString().toString().equals(method.getType())
- && (method.getType() != null && method.getType().length() > 0))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + ".");
- }
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + ".");
}
}
- else
+ }
+ else
+ {
+
+ try
{
- if (exchange == null)
+ virtualHost.createExchange(null,
+ method.getExchange(),
+ method.getType(),
+ method.getDurable(),
+ method.getAutoDelete(),
+ method.getAlternateExchange());
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ + exchangeName + " which begins with reserved name or prefix.");
+ }
+ catch(UnknownExchangeException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND,
+ "Unknown alternate exchange " + e.getExchangeName());
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
+ }
+ catch(ExchangeExistsException e)
+ {
+ Exchange exchange = e.getExistingExchange();
+ if(!exchange.getTypeShortString().toString().equals(method.getType()))
{
- if (exchangeName.startsWith("amq."))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
- + exchangeName + " which begins with reserved prefix 'amq.'.");
- }
- else if (exchangeName.startsWith("qpid."))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
- + exchangeName + " which begins with reserved prefix 'qpid.'.");
- }
- else
- {
- ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
- try
- {
- exchange = exchangeFactory.createExchange(method.getExchange(),
- method.getType(),
- method.getDurable(),
- method.getAutoDelete());
- String alternateExchangeName = method.getAlternateExchange();
- boolean validAlternate;
- if(alternateExchangeName != null && alternateExchangeName.length() != 0)
- {
- Exchange alternate = getExchange(session, alternateExchangeName);
- if(alternate == null)
- {
- validAlternate = false;
- }
- else
- {
- exchange.setAlternateExchange(alternate);
- validAlternate = true;
- }
- }
- else
- {
- validAlternate = true;
- }
- if(validAlternate)
- {
- if (exchange.isDurable())
- {
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- DurableConfigurationStoreHelper.createExchange(store, exchange);
- }
- exchangeRegistry.registerExchange(exchange);
- }
- else
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND,
- "Unknown alternate exchange " + alternateExchangeName);
- }
- }
- catch(AMQUnknownExchangeType e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot declare exchange '" + exchangeName);
- }
- }
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to redeclare exchange: " + exchangeName
+ + " of type " + exchange.getTypeShortString()
+ + " to " + method.getType() +".");
}
- else
+ else if(method.hasAlternateExchange()
+ && (exchange.getAlternateExchange() == null ||
+ !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
{
- if(!exchange.getTypeShortString().toString().equals(method.getType()))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to redeclare exchange: " + exchangeName
- + " of type " + exchange.getTypeShortString()
- + " to " + method.getType() +".");
- }
- else if(method.hasAlternateExchange()
- && (exchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to change alternate exchange of: " + exchangeName
- + " from " + exchange.getAlternateExchange()
- + " to " + method.getAlternateExchange() +".");
- }
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to change alternate exchange of: " + exchangeName
+ + " from " + exchange.getAlternateExchange()
+ + " to " + method.getAlternateExchange() +".");
}
}
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot declare exchange '" + exchangeName);
+ }
+
+
}
+
}
// TODO decouple AMQException and AMQConstant error codes
@@ -841,32 +810,25 @@ public class ServerSessionDelegate extends SessionDelegate
private Exchange getExchange(Session session, String exchangeName)
{
- ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
- return exchangeRegistry.getExchange(exchangeName);
- }
-
- private ExchangeRegistry getExchangeRegistry(Session session)
- {
- VirtualHost virtualHost = getVirtualHost(session);
- return virtualHost.getExchangeRegistry();
-
+ return getVirtualHost(session).getExchange(exchangeName);
}
private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
{
- final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+ VirtualHost virtualHost = getVirtualHost(ssn);
+
Exchange exchange;
if(xfr.hasDestination())
{
- exchange = exchangeRegistry.getExchange(xfr.getDestination());
+ exchange = virtualHost.getExchange(xfr.getDestination());
if(exchange == null)
{
- exchange = exchangeRegistry.getDefaultExchange();
+ exchange = virtualHost.getDefaultExchange();
}
}
else
{
- exchange = exchangeRegistry.getDefaultExchange();
+ exchange = virtualHost.getDefaultExchange();
}
return exchange;
}
@@ -888,7 +850,6 @@ public class ServerSessionDelegate extends SessionDelegate
public void exchangeDelete(Session session, ExchangeDelete method)
{
VirtualHost virtualHost = getVirtualHost(session);
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
try
{
@@ -904,29 +865,23 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "No such exchange '" + method.getExchange() + "'");
}
- else if(exchange.hasReferrers())
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
- }
- else if(isStandardExchange(exchange, virtualHost.getExchangeFactory().getRegisteredTypes()))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted");
- }
else
{
- exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
-
- if (exchange.isDurable() && !exchange.isAutoDelete())
- {
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- DurableConfigurationStoreHelper.removeExchange(store, exchange);
- }
+ virtualHost.removeExchange(exchange, !method.getIfUnused());
}
}
catch (ExchangeInUseException e)
{
exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use");
}
+ catch (ExchangeIsAlternateException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ }
+ catch (RequiredExchangeException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted");
+ }
catch (AMQException e)
{
exception(session, method, e, "Cannot delete exchange '" + method.getExchange() );
@@ -982,7 +937,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
VirtualHost virtualHost = getVirtualHost(session);
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
if (!method.hasQueue())
@@ -1002,7 +956,7 @@ public class ServerSessionDelegate extends SessionDelegate
method.setBindingKey(method.getQueue());
}
AMQQueue queue = queueRegistry.getQueue(method.getQueue());
- Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ Exchange exchange = virtualHost.getExchange(method.getExchange());
if(queue == null)
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
@@ -1045,7 +999,6 @@ public class ServerSessionDelegate extends SessionDelegate
public void exchangeUnbind(Session session, ExchangeUnbind method)
{
VirtualHost virtualHost = getVirtualHost(session);
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
if (!method.hasQueue())
@@ -1063,7 +1016,7 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
AMQQueue queue = queueRegistry.getQueue(method.getQueue());
- Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ Exchange exchange = virtualHost.getExchange(method.getExchange());
if(queue == null)
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
@@ -1091,11 +1044,12 @@ public class ServerSessionDelegate extends SessionDelegate
{
ExchangeBoundResult result = new ExchangeBoundResult();
+ VirtualHost virtualHost = getVirtualHost(session);
Exchange exchange;
AMQQueue queue;
if(method.hasExchange())
{
- exchange = getExchange(session, method.getExchange());
+ exchange = virtualHost.getExchange(method.getExchange());
if(exchange == null)
{
@@ -1104,7 +1058,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- exchange = getExchangeRegistry(session).getDefaultExchange();
+ exchange = virtualHost.getDefaultExchange();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index e89fa8b545..15f5becec2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,7 +34,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -42,10 +43,12 @@ import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
@@ -56,6 +59,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
@@ -259,7 +263,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
{
_logger.debug("Loading configuration for virtualhost: " + config.getName());
- _exchangeRegistry.initialise();
+ _exchangeRegistry.initialise(_exchangeFactory);
List<String> exchangeNames = config.getExchanges();
@@ -279,25 +283,18 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException
{
- AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
-
- Exchange exchange;
- exchange = _exchangeRegistry.getExchange(exchangeName);
- if (exchange == null)
+ boolean durable = exchangeConfiguration.getDurable();
+ boolean autodelete = exchangeConfiguration.getAutoDelete();
+ try
{
-
- AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
- boolean durable = exchangeConfiguration.getDurable();
- boolean autodelete = exchangeConfiguration.getAutoDelete();
-
- Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
- _exchangeRegistry.registerExchange(newExchange);
-
- if (newExchange.isDurable())
- {
- DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), newExchange);
- }
+ Exchange newExchange = createExchange(null, exchangeConfiguration.getName(), exchangeConfiguration.getType(), durable, autodelete,
+ null);
}
+ catch(ExchangeExistsException e)
+ {
+ _logger.info("Exchange " + exchangeConfiguration.getName() + " already defined. Configuration in XML file ignored");
+ }
+
}
private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
@@ -374,16 +371,162 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
return _queueRegistry;
}
- public ExchangeRegistry getExchangeRegistry()
+ protected ExchangeRegistry getExchangeRegistry()
{
return _exchangeRegistry;
}
- public ExchangeFactory getExchangeFactory()
+ protected ExchangeFactory getExchangeFactory()
{
return _exchangeFactory;
}
+ @Override
+ public void addVirtualHostListener(final VirtualHostListener listener)
+ {
+ _exchangeRegistry.addRegistryChangeListener(new ExchangeRegistry.RegistryChangeListener()
+ {
+ @Override
+ public void exchangeRegistered(Exchange exchange)
+ {
+ listener.exchangeRegistered(exchange);
+ }
+
+ @Override
+ public void exchangeUnregistered(Exchange exchange)
+ {
+ listener.exchangeUnregistered(exchange);
+ }
+ });
+ _queueRegistry.addRegistryChangeListener(new QueueRegistry.RegistryChangeListener()
+ {
+ @Override
+ public void queueRegistered(AMQQueue queue)
+ {
+ listener.queueRegistered(queue);
+ }
+
+ @Override
+ public void queueUnregistered(AMQQueue queue)
+ {
+ listener.queueUnregistered(queue);
+ }
+ });
+ _connectionRegistry.addRegistryChangeListener(new IConnectionRegistry.RegistryChangeListener()
+ {
+ @Override
+ public void connectionRegistered(AMQConnectionModel connection)
+ {
+ listener.connectionRegistered(connection);
+ }
+
+ @Override
+ public void connectionUnregistered(AMQConnectionModel connection)
+ {
+ listener.connectionUnregistered(connection);
+ }
+ });
+ }
+
+ @Override
+ public Exchange getExchange(String name)
+ {
+ return _exchangeRegistry.getExchange(name);
+ }
+
+ @Override
+ public Exchange getDefaultExchange()
+ {
+ return _exchangeRegistry.getDefaultExchange();
+ }
+
+ @Override
+ public Collection<Exchange> getExchanges()
+ {
+ return Collections.unmodifiableCollection(_exchangeRegistry.getExchanges());
+ }
+
+ @Override
+ public Collection<ExchangeType<? extends Exchange>> getExchangeTypes()
+ {
+ return _exchangeFactory.getRegisteredTypes();
+ }
+
+ @Override
+ public Exchange createExchange(UUID id,
+ String name,
+ String type,
+ boolean durable,
+ boolean autoDelete,
+ String alternateExchangeName)
+ throws AMQException
+ {
+
+ if(_exchangeRegistry.isReservedExchangeName(name))
+ {
+ throw new ReservedExchangeNameException(name);
+ }
+ synchronized (_exchangeRegistry)
+ {
+ Exchange existing;
+ if((existing = _exchangeRegistry.getExchange(name)) !=null)
+ {
+ throw new ExchangeExistsException(name,existing);
+ }
+ Exchange alternateExchange;
+
+ if(alternateExchangeName != null)
+ {
+ alternateExchange = _exchangeRegistry.getExchange(alternateExchangeName);
+ if(alternateExchange == null)
+ {
+ throw new UnknownExchangeException(alternateExchangeName);
+ }
+ }
+ else
+ {
+ alternateExchange = null;
+ }
+
+ if(id == null)
+ {
+ id = UUIDGenerator.generateExchangeUUID(name, getName());
+ }
+
+ Exchange exchange = _exchangeFactory.createExchange(id, name, type, durable, autoDelete);
+ exchange.setAlternateExchange(alternateExchange);
+ _exchangeRegistry.registerExchange(exchange);
+ if(durable)
+ {
+ DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), exchange);
+ }
+ return exchange;
+ }
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange, boolean force) throws AMQException
+ {
+ if(exchange.hasReferrers())
+ {
+ throw new ExchangeIsAlternateException(exchange.getName());
+ }
+
+ for(ExchangeType type : getExchangeTypes())
+ {
+ if(type.getDefaultExchangeName().toString().equals( exchange.getName() ))
+ {
+ throw new RequiredExchangeException(exchange.getName());
+ }
+ }
+ _exchangeRegistry.unregisterExchange(exchange.getName(), !force);
+ if (exchange.isDurable() && !exchange.isAutoDelete())
+ {
+ DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), exchange);
+ }
+
+ }
+
public SecurityManager getSecurityManager()
{
return _securityManager;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java
new file mode 100644
index 0000000000..f055760efe
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java
@@ -0,0 +1,39 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.exchange.Exchange;
+
+public class ExchangeExistsException extends AMQException
+{
+ private final Exchange _existing;
+
+ public ExchangeExistsException(String name, Exchange existing)
+ {
+ super(name);
+ _existing = existing;
+ }
+
+ public Exchange getExistingExchange()
+ {
+ return _existing;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
new file mode 100644
index 0000000000..4be64a3b94
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
@@ -0,0 +1,30 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+
+public class ExchangeIsAlternateException extends AMQException
+{
+ public ExchangeIsAlternateException(String name)
+ {
+ super(name);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java
new file mode 100644
index 0000000000..da4c9825b1
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java
@@ -0,0 +1,30 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+
+public class RequiredExchangeException extends AMQException
+{
+ public RequiredExchangeException(String name)
+ {
+ super(name);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java
new file mode 100644
index 0000000000..585f045ad9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ReservedExchangeNameException.java
@@ -0,0 +1,38 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+
+public class ReservedExchangeNameException extends AMQException
+{
+ private final String _name;
+
+ public ReservedExchangeNameException(String name)
+ {
+ super("Attempt to create an exchange using a reserved name or prefix: " + name);
+ _name = name;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index 82be0c01e1..b34444cb4c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -96,7 +96,7 @@ public class StandardVirtualHost extends AbstractVirtualHost
_durableConfigurationStore = initialiseConfigurationStore(virtualHost);
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
_durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java
new file mode 100644
index 0000000000..5704126f62
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/UnknownExchangeException.java
@@ -0,0 +1,38 @@
+package org.apache.qpid.server.virtualhost;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.AMQException;
+
+public class UnknownExchangeException extends AMQException
+{
+ private final String _exchangeName;
+
+ public UnknownExchangeException(String exchangeName)
+ {
+ super(exchangeName);
+ _exchangeName = exchangeName;
+ }
+
+ public String getExchangeName()
+ {
+ return _exchangeName;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 8919f4d348..2435854912 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
+import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
@@ -45,9 +50,23 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
QueueRegistry getQueueRegistry();
- ExchangeRegistry getExchangeRegistry();
+ Exchange createExchange(UUID id,
+ String exchange,
+ String type,
+ boolean durable,
+ boolean autoDelete,
+ String alternateExchange)
+ throws AMQException;
- ExchangeFactory getExchangeFactory();
+ void removeExchange(Exchange exchange, boolean force) throws AMQException;
+
+ Exchange getExchange(String name);
+
+ Exchange getDefaultExchange();
+
+ Collection<Exchange> getExchanges();
+
+ Collection<ExchangeType<? extends Exchange>> getExchangeTypes();
DurableConfigurationStore getDurableConfigurationStore();
@@ -55,6 +74,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
SecurityManager getSecurityManager();
+ void addVirtualHostListener(VirtualHostListener listener);
+
void close();
UUID getId();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 3c7e1395d1..34c42cc9cc 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -31,9 +31,10 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
@@ -82,12 +83,19 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private final Map<String, Map<UUID, Map<String, Object>>> _configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>();
+ private final ExchangeRegistry _exchangeRegistry;
+ private final ExchangeFactory _exchangeFactory;
+
private MessageStoreLogSubject _logSubject;
private MessageStore _store;
- public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
+ public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost,
+ ExchangeRegistry exchangeRegistry,
+ ExchangeFactory exchangeFactory)
{
_virtualHost = virtualHost;
+ _exchangeRegistry = exchangeRegistry;
+ _exchangeFactory = exchangeFactory;
}
@Override
@@ -120,7 +128,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if (alternateExchangeId != null)
{
- Exchange altExchange = _virtualHost.getExchangeRegistry().getExchange(alternateExchangeId);
+ Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId);
if (altExchange == null)
{
_logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
@@ -146,12 +154,11 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
try
{
Exchange exchange;
- AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
- exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
+ exchange = _exchangeRegistry.getExchange(exchangeName);
if (exchange == null)
{
- exchange = _virtualHost.getExchangeFactory().createExchange(id, exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
+ exchange = _exchangeFactory.createExchange(id, exchangeName, type, true, autoDelete);
+ _exchangeRegistry.registerExchange(exchange);
}
}
catch (AMQException e)
@@ -352,7 +359,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
try
{
- Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeId);
+ Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
if (exchange == null)
{
_logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
new file mode 100644
index 0000000000..8527435eea
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface VirtualHostListener
+{
+
+ public void queueRegistered(AMQQueue queue);
+
+ public void queueUnregistered(AMQQueue queue);
+
+ public void connectionRegistered(AMQConnectionModel connection);
+
+ public void connectionUnregistered(AMQConnectionModel connection);
+
+ public void exchangeRegistered(Exchange exchange);
+
+ public void exchangeUnregistered(Exchange exchange);
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
index 341ab1b372..56f118cf7d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
@@ -179,8 +179,8 @@ public class DefaultExchangeFactoryTest extends QpidTestCase
}
@Override
- public Exchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket,
- boolean autoDelete) throws AMQException
+ public Exchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable,
+ boolean autoDelete) throws AMQException
{
return null;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 7b7e2ec346..f608bc8cb0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -64,7 +64,7 @@ public class FanoutExchangeTest extends TestCase
when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
- _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
+ _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, false);
}
public void testIsBoundAMQShortStringFieldTableAMQQueueWhenQueueIsNull()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 2b965358e0..d76c7d1128 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -68,7 +68,7 @@ public class HeadersExchangeTest extends TestCase
when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
- _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
+ _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, false);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
index a33c85dfd1..e63744af9a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
@@ -45,7 +45,7 @@ public class BindingLogSubjectTest extends AbstractTestLogSubject
_testVhost = BrokerTestHelper.createVirtualHost("test");
_routingKey = new AMQShortString("RoutingKey");
- _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct");
+ _exchange = _testVhost.getExchange("amq.direct");
_queue = new MockAMQQueue("BindingLogSubjectTest");
((MockAMQQueue) _queue).setVirtualHost(_testVhost);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
index 775a306bd3..b327738797 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
@@ -40,7 +40,7 @@ public class ExchangeLogSubjectTest extends AbstractTestLogSubject
_testVhost = BrokerTestHelper.createVirtualHost("test");
- _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct");
+ _exchange = _testVhost.getExchange("amq.direct");
_subject = new ExchangeLogSubject(_exchange,_testVhost);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 8d5e6b8ee3..c8e0e53d75 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -142,25 +142,24 @@ public class AMQQueueFactoryTest extends QpidTestCase
fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
String queueName = "testDeadLetterQueueEnabled";
- AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
QueueRegistry qReg = _virtualHost.getQueueRegistry();
- ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
- assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
_virtualHost, FieldTable.convertToMap(fieldTable));
Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
- assertEquals("Alternate exchange name was not as expected", dlExchangeName.asString(), altExchange.getName());
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
- assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+ assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
AMQQueue dlQueue = qReg.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
@@ -180,14 +179,13 @@ public class AMQQueueFactoryTest extends QpidTestCase
public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception
{
String queueName = "testDeadLetterQueueEnabled";
- AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
QueueRegistry qReg = _virtualHost.getQueueRegistry();
- ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
- assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
_virtualHost, null);
@@ -195,11 +193,11 @@ public class AMQQueueFactoryTest extends QpidTestCase
assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
- assertEquals("Alternate exchange name was not as expected", dlExchangeName.toString(), altExchange.getName());
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
- assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+ assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
AMQQueue dlQueue = qReg.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
@@ -222,20 +220,19 @@ public class AMQQueueFactoryTest extends QpidTestCase
fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false);
String queueName = "testDeadLetterQueueDisabled";
- AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
QueueRegistry qReg = _virtualHost.getQueueRegistry();
- ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
- assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
_virtualHost, FieldTable.convertToMap(fieldTable));
assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
- assertNull("The alternate exchange should still not exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName));
assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName));
@@ -255,14 +252,13 @@ public class AMQQueueFactoryTest extends QpidTestCase
fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
- AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
QueueRegistry qReg = _virtualHost.getQueueRegistry();
- ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
- assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
//create an autodelete queue
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false,
@@ -271,7 +267,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
//ensure that the autodelete property overrides the request to enable DLQ
assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
- assertNull("The alternate exchange should not exist as queue is autodelete", exReg.getExchange(dlExchangeName));
+ assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName));
assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName));
//only 1 queue should have been registered
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 505c47a69b..c37e6da729 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -112,7 +112,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(),
false, false, _virtualHost, FieldTable.convertToMap(_arguments));
- _exchange = (DirectExchange) _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString());
}
@Override
@@ -423,7 +423,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size());
/* Now release the first message only, causing it to be requeued */
- queueEntries.get(0).release();
+ queueEntries.get(0).release();
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index af4bbd1731..8b678c4eb4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.store;
+import java.util.ArrayList;
+import java.util.Collection;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -78,9 +80,9 @@ public class MessageStoreTest extends QpidTestCase
public static final String SELECTOR_VALUE = "Test = 'MST'";
public static final String LVQ_KEY = "MST-LVQ-KEY";
- private AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
- private AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
- private AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
+ private String nonDurableExchangeName = "MST-NonDurableDirectExchange";
+ private String directExchangeName = "MST-DirectExchange";
+ private String topicExchangeName = "MST-TopicExchange";
private AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
private AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
@@ -365,12 +367,12 @@ public class MessageStoreTest extends QpidTestCase
*/
public void testExchangePersistence() throws Exception
{
- int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
+ int origExchangeCount = getVirtualHost().getExchanges().size();
- Map<AMQShortString, Exchange> oldExchanges = createExchanges();
+ Map<String, Exchange> oldExchanges = createExchanges();
assertEquals("Incorrect number of exchanges registered before recovery",
- origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size());
+ origExchangeCount + 3, getVirtualHost().getExchanges().size());
reloadVirtualHost();
@@ -385,31 +387,28 @@ public class MessageStoreTest extends QpidTestCase
*/
public void testDurableExchangeRemoval() throws Exception
{
- int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
+ int origExchangeCount = getVirtualHost().getExchanges().size();
createExchange(DirectExchange.TYPE, directExchangeName, true);
- ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
assertEquals("Incorrect number of exchanges registered before recovery",
- origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
+ origExchangeCount + 1, getVirtualHost().getExchanges().size());
reloadVirtualHost();
- exchangeRegistry = getVirtualHost().getExchangeRegistry();
assertEquals("Incorrect number of exchanges registered after first recovery",
- origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
+ origExchangeCount + 1, getVirtualHost().getExchanges().size());
//test that removing the exchange means it is not recovered next time
- final Exchange exchange = exchangeRegistry.getExchange(directExchangeName);
+ final Exchange exchange = getVirtualHost().getExchange(directExchangeName);
DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange);
reloadVirtualHost();
- exchangeRegistry = getVirtualHost().getExchangeRegistry();
assertEquals("Incorrect number of exchanges registered after second recovery",
- origExchangeCount, exchangeRegistry.getExchangeNames().size());
+ origExchangeCount, getVirtualHost().getExchanges().size());
assertNull("Durable exchange was not removed:" + directExchangeName,
- exchangeRegistry.getExchange(directExchangeName));
+ getVirtualHost().getExchange(directExchangeName));
}
/**
@@ -420,12 +419,12 @@ public class MessageStoreTest extends QpidTestCase
*/
public void testBindingPersistence() throws Exception
{
- int origExchangeCount = getVirtualHost().getExchangeRegistry().getExchangeNames().size();
+ int origExchangeCount = getVirtualHost().getExchanges().size();
createAllQueues();
createAllTopicQueues();
- Map<AMQShortString, Exchange> exchanges = createExchanges();
+ Map<String, Exchange> exchanges = createExchanges();
Exchange nonDurableExchange = exchanges.get(nonDurableExchangeName);
Exchange directExchange = exchanges.get(directExchangeName);
@@ -436,7 +435,7 @@ public class MessageStoreTest extends QpidTestCase
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
assertEquals("Incorrect number of exchanges registered before recovery",
- origExchangeCount + 3, getVirtualHost().getExchangeRegistry().getExchangeNames().size());
+ origExchangeCount + 3, getVirtualHost().getExchanges().size());
reloadVirtualHost();
@@ -469,8 +468,7 @@ public class MessageStoreTest extends QpidTestCase
assertEquals("Incorrect number of bindings registered after first recovery",
1, queueRegistry.getQueue(durableQueueName).getBindings().size());
- ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
- exch = exchangeRegistry.getExchange(directExchangeName);
+ exch = getVirtualHost().getExchange(directExchangeName);
assertNotNull("Exchange was not recovered", exch);
//remove the binding and verify result after recovery
@@ -488,26 +486,30 @@ public class MessageStoreTest extends QpidTestCase
* and that the new exchanges are not the same objects as the provided list (i.e. that the
* reload actually generated new exchange objects)
*/
- private void validateExchanges(int originalNumExchanges, Map<AMQShortString, Exchange> oldExchanges)
+ private void validateExchanges(int originalNumExchanges, Map<String, Exchange> oldExchanges)
{
- ExchangeRegistry registry = getVirtualHost().getExchangeRegistry();
-
+ Collection<Exchange> exchanges = getVirtualHost().getExchanges();
+ Collection<String> exchangeNames = new ArrayList(exchanges.size());
+ for(Exchange exchange : exchanges)
+ {
+ exchangeNames.add(exchange.getName());
+ }
assertTrue(directExchangeName + " exchange NOT reloaded",
- registry.getExchangeNames().contains(directExchangeName));
+ exchangeNames.contains(directExchangeName));
assertTrue(topicExchangeName + " exchange NOT reloaded",
- registry.getExchangeNames().contains(topicExchangeName));
+ exchangeNames.contains(topicExchangeName));
assertTrue(nonDurableExchangeName + " exchange reloaded",
- !registry.getExchangeNames().contains(nonDurableExchangeName));
+ !exchangeNames.contains(nonDurableExchangeName));
//check the old exchange objects are not the same as the new exchanges
assertTrue(directExchangeName + " exchange NOT reloaded",
- registry.getExchange(directExchangeName) != oldExchanges.get(directExchangeName));
+ getVirtualHost().getExchange(directExchangeName) != oldExchanges.get(directExchangeName));
assertTrue(topicExchangeName + " exchange NOT reloaded",
- registry.getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName));
+ getVirtualHost().getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName));
// There should only be the original exchanges + our 2 recovered durable exchanges
assertEquals("Incorrect number of exchanges available",
- originalNumExchanges + 2, registry.getExchangeNames().size());
+ originalNumExchanges + 2, getVirtualHost().getExchanges().size());
}
/** Validates the Durable queues and their properties are as expected following recovery */
@@ -771,9 +773,9 @@ public class MessageStoreTest extends QpidTestCase
}
- private Map<AMQShortString, Exchange> createExchanges()
+ private Map<String, Exchange> createExchanges()
{
- Map<AMQShortString, Exchange> exchanges = new HashMap<AMQShortString, Exchange>();
+ Map<String, Exchange> exchanges = new HashMap<String, Exchange>();
//Register non-durable DirectExchange
exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false));
@@ -785,32 +787,19 @@ public class MessageStoreTest extends QpidTestCase
return exchanges;
}
- private Exchange createExchange(ExchangeType<?> type, AMQShortString name, boolean durable)
+ private Exchange createExchange(ExchangeType<?> type, String name, boolean durable)
{
Exchange exchange = null;
try
{
- exchange = type.newInstance(UUIDGenerator.generateRandomUUID(), getVirtualHost(), name, durable, 0, false);
+ exchange = getVirtualHost().createExchange(null, name, type.getName().toString(), durable, false, null);
}
catch (AMQException e)
{
fail(e.getMessage());
}
- try
- {
- getVirtualHost().getExchangeRegistry().registerExchange(exchange);
- if (durable)
- {
- DurableConfigurationStoreHelper.createExchange(getVirtualHost().getDurableConfigurationStore(),
- exchange);
- }
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
return exchange;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 7811d04997..24cdf34360 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -190,7 +190,7 @@ public class BrokerTestHelper
when(info.getExchange()).thenReturn(exchangeNameAsShortString);
when(info.getRoutingKey()).thenReturn(rouningKey);
- Exchange exchange = channel.getVirtualHost().getExchangeRegistry().getExchange(exchangeName);
+ Exchange exchange = channel.getVirtualHost().getExchange(exchangeName);
for (int count = 0; count < numberOfMessages; count++)
{
channel.setPublishFrame(info, exchange);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 7552a653fe..78d55c42bf 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -20,12 +20,16 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collection;
import java.util.concurrent.ScheduledFuture;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
@@ -77,16 +81,6 @@ public class MockVirtualHost implements VirtualHost
return null;
}
- public ExchangeFactory getExchangeFactory()
- {
- return null;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return null;
- }
-
public int getHouseKeepingActiveCount()
{
return 0;
@@ -127,11 +121,56 @@ public class MockVirtualHost implements VirtualHost
return null;
}
+ @Override
+ public Exchange createExchange(UUID id,
+ String exchange,
+ String type,
+ boolean durable,
+ boolean autoDelete,
+ String alternateExchange) throws AMQException
+ {
+ return null;
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange, boolean force) throws AMQException
+ {
+ }
+
+ @Override
+ public Exchange getExchange(String name)
+ {
+ return null;
+ }
+
+ @Override
+ public Exchange getDefaultExchange()
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<Exchange> getExchanges()
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<ExchangeType<? extends Exchange>> getExchangeTypes()
+ {
+ return null;
+ }
+
public SecurityManager getSecurityManager()
{
return null;
}
+ @Override
+ public void addVirtualHostListener(VirtualHostListener listener)
+ {
+ }
+
public LinkRegistry getLinkRegistry(String remoteContainerId)
{
return null;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index ae09e8d7e7..6b8ea0e80b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -200,7 +200,7 @@ public class StandardVirtualHostTest extends QpidTestCase
File config = writeConfigFile(vhostName, queueName, exchangeName, false, new String[]{"ping","pong"}, bindingArguments);
VirtualHost vhost = createVirtualHost(vhostName, config);
- Exchange exch = vhost.getExchangeRegistry().getExchange(getName() +".direct");
+ Exchange exch = vhost.getExchange(getName() +".direct");
Collection<Binding> bindings = exch.getBindings();
assertNotNull("Bindings cannot be null", bindings);
assertEquals("Unexpected number of bindings", 3, bindings.size());
@@ -245,10 +245,10 @@ public class StandardVirtualHostTest extends QpidTestCase
AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName);
assertNotNull("queue should exist", queue);
- Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange();
+ Exchange defaultExch = vhost.getDefaultExchange();
assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue));
- Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exch = vhost.getExchange(exchangeName);
assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue));
for(String key: routingKeys)